commit
ea19fbe6c2
@ -55,6 +55,7 @@ RxLua
|
|||||||
- [reduce](#reduceaccumulator-seed)
|
- [reduce](#reduceaccumulator-seed)
|
||||||
- [reject](#rejectpredicate)
|
- [reject](#rejectpredicate)
|
||||||
- [retry](#retrycount)
|
- [retry](#retrycount)
|
||||||
|
- [sample](#samplesampler)
|
||||||
- [scan](#scanaccumulator-seed)
|
- [scan](#scanaccumulator-seed)
|
||||||
- [skip](#skipn)
|
- [skip](#skipn)
|
||||||
- [skipLast](#skiplastcount)
|
- [skipLast](#skiplastcount)
|
||||||
@ -583,6 +584,16 @@ Returns an Observable that restarts in the event of an error.
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
#### `:sample(sampler)`
|
||||||
|
|
||||||
|
Returns a new Observable that produces its most recent value every time the specified observable produces a value.
|
||||||
|
|
||||||
|
| Name | Type | Default | Description |
|
||||||
|
|------|------|---------|-------------|
|
||||||
|
| `sampler` | Observable | | The Observable that is used to sample values from this Observable. |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
#### `:scan(accumulator, seed)`
|
#### `:scan(accumulator, seed)`
|
||||||
|
|
||||||
Returns a new Observable that produces values computed by accumulating the results of running a function on each value produced by the original Observable.
|
Returns a new Observable that produces values computed by accumulating the results of running a function on each value produced by the original Observable.
|
||||||
|
36
rx.lua
36
rx.lua
@ -1190,6 +1190,42 @@ function Observable:retry(count)
|
|||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
--- Returns a new Observable that produces its most recent value every time the specified observable
|
||||||
|
-- produces a value.
|
||||||
|
-- @arg {Observable} sampler - The Observable that is used to sample values from this Observable.
|
||||||
|
-- @returns {Observable}
|
||||||
|
function Observable:sample(sampler)
|
||||||
|
if not sampler then error('Expected an Observable') end
|
||||||
|
|
||||||
|
return Observable.create(function(observer)
|
||||||
|
local latest = {}
|
||||||
|
|
||||||
|
local function setLatest(...)
|
||||||
|
latest = util.pack(...)
|
||||||
|
end
|
||||||
|
|
||||||
|
local function onNext()
|
||||||
|
return observer:onNext(util.unpack(latest))
|
||||||
|
end
|
||||||
|
|
||||||
|
local function onError(message)
|
||||||
|
return observer:onError(message)
|
||||||
|
end
|
||||||
|
|
||||||
|
local function onCompleted()
|
||||||
|
return observer:onCompleted()
|
||||||
|
end
|
||||||
|
|
||||||
|
local sourceSubscription = self:subscribe(setLatest, onError)
|
||||||
|
local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted)
|
||||||
|
|
||||||
|
return Subscription.create(function()
|
||||||
|
if sourceSubscription then sourceSubscription:unsubscribe() end
|
||||||
|
if sampleSubscription then sampleSubscription:unsubscribe() end
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
--- Returns a new Observable that produces values computed by accumulating the results of running a
|
--- Returns a new Observable that produces values computed by accumulating the results of running a
|
||||||
-- function on each value produced by the original Observable.
|
-- function on each value produced by the original Observable.
|
||||||
-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
|
-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
|
||||||
|
38
src/operators/sample.lua
Normal file
38
src/operators/sample.lua
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
local Observable = require 'observable'
|
||||||
|
local util = require 'util'
|
||||||
|
|
||||||
|
--- Returns a new Observable that produces its most recent value every time the specified observable
|
||||||
|
-- produces a value.
|
||||||
|
-- @arg {Observable} sampler - The Observable that is used to sample values from this Observable.
|
||||||
|
-- @returns {Observable}
|
||||||
|
function Observable:sample(sampler)
|
||||||
|
if not sampler then error('Expected an Observable') end
|
||||||
|
|
||||||
|
return Observable.create(function(observer)
|
||||||
|
local latest = {}
|
||||||
|
|
||||||
|
local function setLatest(...)
|
||||||
|
latest = util.pack(...)
|
||||||
|
end
|
||||||
|
|
||||||
|
local function onNext()
|
||||||
|
return observer:onNext(util.unpack(latest))
|
||||||
|
end
|
||||||
|
|
||||||
|
local function onError(message)
|
||||||
|
return observer:onError(message)
|
||||||
|
end
|
||||||
|
|
||||||
|
local function onCompleted()
|
||||||
|
return observer:onCompleted()
|
||||||
|
end
|
||||||
|
|
||||||
|
local sourceSubscription = self:subscribe(setLatest, onError)
|
||||||
|
local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted)
|
||||||
|
|
||||||
|
return Subscription.create(function()
|
||||||
|
if sourceSubscription then sourceSubscription:unsubscribe() end
|
||||||
|
if sampleSubscription then sampleSubscription:unsubscribe() end
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
end
|
@ -260,6 +260,7 @@ describe('Observable', function()
|
|||||||
dofile('tests/reduce.lua')
|
dofile('tests/reduce.lua')
|
||||||
dofile('tests/reject.lua')
|
dofile('tests/reject.lua')
|
||||||
dofile('tests/retry.lua')
|
dofile('tests/retry.lua')
|
||||||
|
dofile('tests/sample.lua')
|
||||||
dofile('tests/scan.lua')
|
dofile('tests/scan.lua')
|
||||||
dofile('tests/skip.lua')
|
dofile('tests/skip.lua')
|
||||||
dofile('tests/skipLast.lua')
|
dofile('tests/skipLast.lua')
|
||||||
|
58
tests/sample.lua
Normal file
58
tests/sample.lua
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
describe('sample', function()
|
||||||
|
it('errors if no sampler is specified', function()
|
||||||
|
expect(function() Rx.Observable.empty():sample() end).to.fail()
|
||||||
|
end)
|
||||||
|
|
||||||
|
it('produces nil values if the sampler fires before the source does', function()
|
||||||
|
local sampler = Rx.Observable.fromRange(3)
|
||||||
|
local onNext, onError, onCompleted = observableSpy(Rx.Observable.empty():sample(sampler))
|
||||||
|
expect(#onNext).to.equal(3)
|
||||||
|
expect(next(onNext[1])).to_not.exist()
|
||||||
|
expect(next(onNext[2])).to_not.exist()
|
||||||
|
expect(next(onNext[3])).to_not.exist()
|
||||||
|
end)
|
||||||
|
|
||||||
|
it('produces the latest value produced by the source when the sampler fires', function()
|
||||||
|
local a = Rx.Subject.create()
|
||||||
|
local b = Rx.Subject.create()
|
||||||
|
local onNext, onError, onCompleted = observableSpy(a:sample(b))
|
||||||
|
a:onNext(1)
|
||||||
|
b:onNext('a')
|
||||||
|
b:onNext('b')
|
||||||
|
a:onNext(2)
|
||||||
|
a:onNext(3)
|
||||||
|
b:onNext('c')
|
||||||
|
a:onCompleted()
|
||||||
|
b:onCompleted()
|
||||||
|
expect(onNext).to.equal({{1}, {1}, {3}})
|
||||||
|
end)
|
||||||
|
|
||||||
|
it('completes when the sampler completes', function()
|
||||||
|
local a = Rx.Subject.create()
|
||||||
|
local b = Rx.Subject.create()
|
||||||
|
local onNext, onError, onCompleted = observableSpy(a:sample(b))
|
||||||
|
a:onNext(1)
|
||||||
|
a:onCompleted()
|
||||||
|
b:onNext('a')
|
||||||
|
b:onNext('b')
|
||||||
|
b:onNext('c')
|
||||||
|
expect(#onCompleted).to.equal(0)
|
||||||
|
b:onCompleted()
|
||||||
|
expect(#onCompleted).to.equal(1)
|
||||||
|
expect(onNext).to.equal({{1}, {1}, {1}})
|
||||||
|
end)
|
||||||
|
|
||||||
|
it('errors when the source errors', function()
|
||||||
|
local a = Rx.Observable.throw()
|
||||||
|
local b = Rx.Observable.fromRange(3)
|
||||||
|
local onNext, onError, onCompleted = observableSpy(a:sample(b))
|
||||||
|
expect(#onError).to.equal(1)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it('errors when the sampler errors', function()
|
||||||
|
local a = Rx.Observable.fromRange(3)
|
||||||
|
local b = Rx.Observable.throw()
|
||||||
|
local onNext, onError, onCompleted = observableSpy(a:sample(b))
|
||||||
|
expect(#onError).to.equal(1)
|
||||||
|
end)
|
||||||
|
end)
|
@ -41,6 +41,7 @@ local files = {
|
|||||||
'src/operators/reduce.lua',
|
'src/operators/reduce.lua',
|
||||||
'src/operators/reject.lua',
|
'src/operators/reject.lua',
|
||||||
'src/operators/retry.lua',
|
'src/operators/retry.lua',
|
||||||
|
'src/operators/sample.lua',
|
||||||
'src/operators/scan.lua',
|
'src/operators/scan.lua',
|
||||||
'src/operators/skip.lua',
|
'src/operators/skip.lua',
|
||||||
'src/operators/skipLast.lua',
|
'src/operators/skipLast.lua',
|
||||||
|
Loading…
x
Reference in New Issue
Block a user