From b1ff4dfd1533aa048c7f4540080841cd501f194c Mon Sep 17 00:00:00 2001 From: bjorn Date: Sat, 21 Nov 2015 20:44:14 -0800 Subject: [PATCH] Observable.delay; --- doc/README.md | 28 ++++++++++++++++++++++++++-- rx.lua | 27 +++++++++++++++++++++++++++ src/operators/delay.lua | 35 +++++++++++++++++++++++++++++++++++ tools/build.lua | 1 + 4 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 src/operators/delay.lua diff --git a/doc/README.md b/doc/README.md index 3d295b2..ae204c8 100644 --- a/doc/README.md +++ b/doc/README.md @@ -80,6 +80,9 @@ RxLua - [schedule](#scheduleaction-delay) - [update](#updatedelta) - [isEmpty](#isempty) +- [TimeoutScheduler](#timeoutscheduler) + - [create](#create) + - [schedule](#scheduleaction-delay) - [Subject](#subject) - [create](#create) - [subscribe](#subscribeonnext-onerror-oncompleted) @@ -771,12 +774,12 @@ Creates a new CooperativeScheduler. #### `:schedule(action, delay)` -Schedules a function to be run after an optional delay. +Schedules a function to be run after an optional delay. Returns a subscription that will stop the action from running. | Name | Type | Default | Description | |------|------|---------|-------------| | `action` | function | | The function to execute. Will be converted into a coroutine. The coroutine may yield execution back to the scheduler with an optional number, which will put it to sleep for a time period. | -| `delay` | number (optional) | 0 | Delay execution of the action by a time period. | +| `delay` | number (optional) | 0 | Delay execution of the action by a virtual time period. | --- @@ -794,6 +797,27 @@ Triggers an update of the CooperativeScheduler. The clock will be advanced and t Returns whether or not the CooperativeScheduler's queue is empty. +# TimeoutScheduler + +A scheduler that uses luvit's timer library to schedule events on an event loop. + +--- + +#### `.create()` + +Creates a new TimeoutScheduler. + +--- + +#### `:schedule(action, delay)` + +Schedules an action to run at a future point in time. + +| Name | Type | Default | Description | +|------|------|---------|-------------| +| `action` | function | | The action to run. | +| `delay` | number (optional) | 0 | The delay, in milliseconds. | + # Subject Subjects function both as an Observer and as an Observable. Subjects inherit all Observable functions, including subscribe. Values can also be pushed to the Subject, which will be broadcasted to any subscribed Observers. diff --git a/rx.lua b/rx.lua index 22c6426..189e5a4 100644 --- a/rx.lua +++ b/rx.lua @@ -636,6 +636,33 @@ function Observable:defaultIfEmpty(...) end) end +function Observable:delay(time, scheduler) + local time = type(time) ~= 'function' and util.constant(time) or time + + return Observable.create(function(observer) + local actions = {} + + local function delay(key) + return function(...) + local arg = util.pack(...) + local handle = scheduler:schedule(function() + observer[key](observer, util.unpack(arg)) + end, time()) + table.insert(actions, handle) + end + end + + local subscription = self:subscribe(delay('onNext'), delay('onError'), delay('onCompleted')) + + return Subscription.create(function() + if subscription then subscription:unsubscribe() end + for i = 1, #actions do + actions[i]:unsubscribe() + end + end) + end) +end + --- Returns a new Observable that produces the values from the original with duplicates removed. -- @returns {Observable} function Observable:distinct() diff --git a/src/operators/delay.lua b/src/operators/delay.lua new file mode 100644 index 0000000..2215e30 --- /dev/null +++ b/src/operators/delay.lua @@ -0,0 +1,35 @@ +local Observable = require 'Observable' +local Subscription = require 'Subscription' +local util = require 'util' + +--- Returns a new Observable that produces the values of the original delayed by a time period. +-- @arg {number|function} delay - An amount in milliseconds to delay by, or a function which returns +-- this value. +-- @arg {Scheduler} scheduler - The scheduler to run the Observable on. +-- @returns {Observable} +function Observable:delay(delay, scheduler) + delay = type(delay) ~= 'function' and util.constant(delay) or delay + + return Observable.create(function(observer) + local actions = {} + + local function delay(key) + return function(...) + local arg = util.pack(...) + local handle = scheduler:schedule(function() + observer[key](observer, util.unpack(arg)) + end, delay()) + table.insert(actions, handle) + end + end + + local subscription = self:subscribe(delay('onNext'), delay('onError'), delay('onCompleted')) + + return Subscription.create(function() + if subscription then subscription:unsubscribe() end + for i = 1, #actions do + actions[i]:unsubscribe() + end + end) + end) +end diff --git a/tools/build.lua b/tools/build.lua index 00af60e..c4706b4 100644 --- a/tools/build.lua +++ b/tools/build.lua @@ -18,6 +18,7 @@ local files = { 'src/operators/contains.lua', 'src/operators/count.lua', 'src/operators/defaultIfEmpty.lua', + 'src/operators/delay.lua', 'src/operators/distinct.lua', 'src/operators/distinctUntilChanged.lua', 'src/operators/elementAt.lua',