Observable.delay;

This commit is contained in:
bjorn 2015-11-21 20:44:14 -08:00
parent edc5c25532
commit b1ff4dfd15
4 changed files with 89 additions and 2 deletions

View File

@ -80,6 +80,9 @@ RxLua
- [schedule](#scheduleaction-delay) - [schedule](#scheduleaction-delay)
- [update](#updatedelta) - [update](#updatedelta)
- [isEmpty](#isempty) - [isEmpty](#isempty)
- [TimeoutScheduler](#timeoutscheduler)
- [create](#create)
- [schedule](#scheduleaction-delay)
- [Subject](#subject) - [Subject](#subject)
- [create](#create) - [create](#create)
- [subscribe](#subscribeonnext-onerror-oncompleted) - [subscribe](#subscribeonnext-onerror-oncompleted)
@ -771,12 +774,12 @@ Creates a new CooperativeScheduler.
#### `:schedule(action, delay)` #### `:schedule(action, delay)`
Schedules a function to be run after an optional delay. Schedules a function to be run after an optional delay. Returns a subscription that will stop the action from running.
| Name | Type | Default | Description | | Name | Type | Default | Description |
|------|------|---------|-------------| |------|------|---------|-------------|
| `action` | function | | The function to execute. Will be converted into a coroutine. The coroutine may yield execution back to the scheduler with an optional number, which will put it to sleep for a time period. | | `action` | function | | The function to execute. Will be converted into a coroutine. The coroutine may yield execution back to the scheduler with an optional number, which will put it to sleep for a time period. |
| `delay` | number (optional) | 0 | Delay execution of the action by a time period. | | `delay` | number (optional) | 0 | Delay execution of the action by a virtual time period. |
--- ---
@ -794,6 +797,27 @@ Triggers an update of the CooperativeScheduler. The clock will be advanced and t
Returns whether or not the CooperativeScheduler's queue is empty. Returns whether or not the CooperativeScheduler's queue is empty.
# TimeoutScheduler
A scheduler that uses luvit's timer library to schedule events on an event loop.
---
#### `.create()`
Creates a new TimeoutScheduler.
---
#### `:schedule(action, delay)`
Schedules an action to run at a future point in time.
| Name | Type | Default | Description |
|------|------|---------|-------------|
| `action` | function | | The action to run. |
| `delay` | number (optional) | 0 | The delay, in milliseconds. |
# Subject # Subject
Subjects function both as an Observer and as an Observable. Subjects inherit all Observable functions, including subscribe. Values can also be pushed to the Subject, which will be broadcasted to any subscribed Observers. Subjects function both as an Observer and as an Observable. Subjects inherit all Observable functions, including subscribe. Values can also be pushed to the Subject, which will be broadcasted to any subscribed Observers.

27
rx.lua
View File

@ -636,6 +636,33 @@ function Observable:defaultIfEmpty(...)
end) end)
end end
function Observable:delay(time, scheduler)
local time = type(time) ~= 'function' and util.constant(time) or time
return Observable.create(function(observer)
local actions = {}
local function delay(key)
return function(...)
local arg = util.pack(...)
local handle = scheduler:schedule(function()
observer[key](observer, util.unpack(arg))
end, time())
table.insert(actions, handle)
end
end
local subscription = self:subscribe(delay('onNext'), delay('onError'), delay('onCompleted'))
return Subscription.create(function()
if subscription then subscription:unsubscribe() end
for i = 1, #actions do
actions[i]:unsubscribe()
end
end)
end)
end
--- Returns a new Observable that produces the values from the original with duplicates removed. --- Returns a new Observable that produces the values from the original with duplicates removed.
-- @returns {Observable} -- @returns {Observable}
function Observable:distinct() function Observable:distinct()

35
src/operators/delay.lua Normal file
View File

@ -0,0 +1,35 @@
local Observable = require 'Observable'
local Subscription = require 'Subscription'
local util = require 'util'
--- Returns a new Observable that produces the values of the original delayed by a time period.
-- @arg {number|function} delay - An amount in milliseconds to delay by, or a function which returns
-- this value.
-- @arg {Scheduler} scheduler - The scheduler to run the Observable on.
-- @returns {Observable}
function Observable:delay(delay, scheduler)
delay = type(delay) ~= 'function' and util.constant(delay) or delay
return Observable.create(function(observer)
local actions = {}
local function delay(key)
return function(...)
local arg = util.pack(...)
local handle = scheduler:schedule(function()
observer[key](observer, util.unpack(arg))
end, delay())
table.insert(actions, handle)
end
end
local subscription = self:subscribe(delay('onNext'), delay('onError'), delay('onCompleted'))
return Subscription.create(function()
if subscription then subscription:unsubscribe() end
for i = 1, #actions do
actions[i]:unsubscribe()
end
end)
end)
end

View File

@ -18,6 +18,7 @@ local files = {
'src/operators/contains.lua', 'src/operators/contains.lua',
'src/operators/count.lua', 'src/operators/count.lua',
'src/operators/defaultIfEmpty.lua', 'src/operators/defaultIfEmpty.lua',
'src/operators/delay.lua',
'src/operators/distinct.lua', 'src/operators/distinct.lua',
'src/operators/distinctUntilChanged.lua', 'src/operators/distinctUntilChanged.lua',
'src/operators/elementAt.lua', 'src/operators/elementAt.lua',