Fix Observable.delay;

This commit is contained in:
bjorn 2015-12-18 13:58:41 -08:00
parent b1ff4dfd15
commit 16a0336dbe
3 changed files with 49 additions and 7 deletions

View File

@ -33,6 +33,7 @@ RxLua
- [contains](#containsvalue)
- [count](#countpredicate)
- [defaultIfEmpty](#defaultifemptyvalues)
- [delay](#delaytime-scheduler)
- [distinct](#distinct)
- [distinctUntilChanged](#distinctuntilchangedcomparator)
- [elementAt](#elementatindex)
@ -392,6 +393,17 @@ Returns a new Observable that produces a default set of items if the source Obse
---
#### `:delay(time, scheduler)`
Returns a new Observable that produces the values of the original delayed by a time period.
| Name | Type | Default | Description |
|------|------|---------|-------------|
| `time` | number|function | | An amount in milliseconds to delay by, or a function which returns this value. |
| `scheduler` | Scheduler | | The scheduler to run the Observable on. |
---
#### `:distinct()`
Returns a new Observable that produces the values from the original with duplicates removed.

36
rx.lua
View File

@ -604,6 +604,31 @@ function Observable:count(predicate)
end)
end
function Observable:debounce(time, scheduler)
time = time or 0
return Observable.create(function(observer)
local function debounce(key)
local debounced
return function(...)
local value = util.pack(...)
if debounced then
debounced:unsubscribe()
end
local values = util.pack(...)
debounced = scheduler:schedule(function()
return observer[key](observer, util.unpack(values))
end, time)
end
end
local subscription = self:subscribe(debounce('onNext'), debounce('onError'), debounce('onCompleted'))
end)
end
--- Returns a new Observable that produces a default set of items if the source Observable produces
-- no values.
-- @arg {*...} values - Zero or more values to produce if the source completes without emitting
@ -636,8 +661,13 @@ function Observable:defaultIfEmpty(...)
end)
end
--- Returns a new Observable that produces the values of the original delayed by a time period.
-- @arg {number|function} time - An amount in milliseconds to delay by, or a function which returns
-- this value.
-- @arg {Scheduler} scheduler - The scheduler to run the Observable on.
-- @returns {Observable}
function Observable:delay(time, scheduler)
local time = type(time) ~= 'function' and util.constant(time) or time
time = type(time) ~= 'function' and util.constant(time) or time
return Observable.create(function(observer)
local actions = {}
@ -1807,10 +1837,10 @@ end
-- @arg {function} action - The action to run.
-- @arg {number=0} delay - The delay, in milliseconds.
-- @returns {Subscription}
function TimeoutScheduler:schedule(action, delay)
function TimeoutScheduler:schedule(action, delay, ...)
local timer = require 'timer'
local subscription
local handle = timer.setTimeout(delay, action)
local handle = timer.setTimeout(delay, action, ...)
return Subscription.create(function()
timer.clearTimeout(handle)
end)

View File

@ -3,12 +3,12 @@ local Subscription = require 'Subscription'
local util = require 'util'
--- Returns a new Observable that produces the values of the original delayed by a time period.
-- @arg {number|function} delay - An amount in milliseconds to delay by, or a function which returns
-- @arg {number|function} time - An amount in milliseconds to delay by, or a function which returns
-- this value.
-- @arg {Scheduler} scheduler - The scheduler to run the Observable on.
-- @returns {Observable}
function Observable:delay(delay, scheduler)
delay = type(delay) ~= 'function' and util.constant(delay) or delay
function Observable:delay(time, scheduler)
time = type(time) ~= 'function' and util.constant(time) or time
return Observable.create(function(observer)
local actions = {}
@ -18,7 +18,7 @@ function Observable:delay(delay, scheduler)
local arg = util.pack(...)
local handle = scheduler:schedule(function()
observer[key](observer, util.unpack(arg))
end, delay())
end, time())
table.insert(actions, handle)
end
end