Observable.sample;

This commit is contained in:
bjorn 2016-01-09 16:07:23 -08:00
parent 6d6cc32da2
commit 72a7b98cc3
6 changed files with 145 additions and 0 deletions

View File

@ -55,6 +55,7 @@ RxLua
- [reduce](#reduceaccumulator-seed) - [reduce](#reduceaccumulator-seed)
- [reject](#rejectpredicate) - [reject](#rejectpredicate)
- [retry](#retrycount) - [retry](#retrycount)
- [sample](#samplesampler)
- [scan](#scanaccumulator-seed) - [scan](#scanaccumulator-seed)
- [skip](#skipn) - [skip](#skipn)
- [skipLast](#skiplastcount) - [skipLast](#skiplastcount)
@ -583,6 +584,16 @@ Returns an Observable that restarts in the event of an error.
--- ---
#### `:sample(sampler)`
Returns a new Observable that produces its most recent value every time the specified observable produces a value.
| Name | Type | Default | Description |
|------|------|---------|-------------|
| `sampler` | Observable | | The Observable that is used to sample values from this Observable. |
---
#### `:scan(accumulator, seed)` #### `:scan(accumulator, seed)`
Returns a new Observable that produces values computed by accumulating the results of running a function on each value produced by the original Observable. Returns a new Observable that produces values computed by accumulating the results of running a function on each value produced by the original Observable.

36
rx.lua
View File

@ -1190,6 +1190,42 @@ function Observable:retry(count)
end) end)
end end
--- Returns a new Observable that produces its most recent value every time the specified observable
-- produces a value.
-- @arg {Observable} sampler - The Observable that is used to sample values from this Observable.
-- @returns {Observable}
function Observable:sample(sampler)
if not sampler then error('Expected an Observable') end
return Observable.create(function(observer)
local latest = {}
local function setLatest(...)
latest = util.pack(...)
end
local function onNext()
return observer:onNext(util.unpack(latest))
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
local sourceSubscription = self:subscribe(setLatest, onError)
local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted)
return Subscription.create(function()
if sourceSubscription then sourceSubscription:unsubscribe() end
if sampleSubscription then sampleSubscription:unsubscribe() end
end)
end)
end
--- Returns a new Observable that produces values computed by accumulating the results of running a --- Returns a new Observable that produces values computed by accumulating the results of running a
-- function on each value produced by the original Observable. -- function on each value produced by the original Observable.
-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed

38
src/operators/sample.lua Normal file
View File

@ -0,0 +1,38 @@
local Observable = require 'observable'
local util = require 'util'
--- Returns a new Observable that produces its most recent value every time the specified observable
-- produces a value.
-- @arg {Observable} sampler - The Observable that is used to sample values from this Observable.
-- @returns {Observable}
function Observable:sample(sampler)
if not sampler then error('Expected an Observable') end
return Observable.create(function(observer)
local latest = {}
local function setLatest(...)
latest = util.pack(...)
end
local function onNext()
return observer:onNext(util.unpack(latest))
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
local sourceSubscription = self:subscribe(setLatest, onError)
local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted)
return Subscription.create(function()
if sourceSubscription then sourceSubscription:unsubscribe() end
if sampleSubscription then sampleSubscription:unsubscribe() end
end)
end)
end

View File

@ -260,6 +260,7 @@ describe('Observable', function()
dofile('tests/reduce.lua') dofile('tests/reduce.lua')
dofile('tests/reject.lua') dofile('tests/reject.lua')
dofile('tests/retry.lua') dofile('tests/retry.lua')
dofile('tests/sample.lua')
dofile('tests/scan.lua') dofile('tests/scan.lua')
dofile('tests/skip.lua') dofile('tests/skip.lua')
dofile('tests/skipLast.lua') dofile('tests/skipLast.lua')

58
tests/sample.lua Normal file
View File

@ -0,0 +1,58 @@
describe('sample', function()
it('errors if no sampler is specified', function()
expect(function() Rx.Observable.empty():sample() end).to.fail()
end)
it('produces nil values if the sampler fires before the source does', function()
local sampler = Rx.Observable.fromRange(3)
local onNext, onError, onCompleted = observableSpy(Rx.Observable.empty():sample(sampler))
expect(#onNext).to.equal(3)
expect(next(onNext[1])).to_not.exist()
expect(next(onNext[2])).to_not.exist()
expect(next(onNext[3])).to_not.exist()
end)
it('produces the latest value produced by the source when the sampler fires', function()
local a = Rx.Subject.create()
local b = Rx.Subject.create()
local onNext, onError, onCompleted = observableSpy(a:sample(b))
a:onNext(1)
b:onNext('a')
b:onNext('b')
a:onNext(2)
a:onNext(3)
b:onNext('c')
a:onCompleted()
b:onCompleted()
expect(onNext).to.equal({{1}, {1}, {3}})
end)
it('completes when the sampler completes', function()
local a = Rx.Subject.create()
local b = Rx.Subject.create()
local onNext, onError, onCompleted = observableSpy(a:sample(b))
a:onNext(1)
a:onCompleted()
b:onNext('a')
b:onNext('b')
b:onNext('c')
expect(#onCompleted).to.equal(0)
b:onCompleted()
expect(#onCompleted).to.equal(1)
expect(onNext).to.equal({{1}, {1}, {1}})
end)
it('errors when the source errors', function()
local a = Rx.Observable.throw()
local b = Rx.Observable.fromRange(3)
local onNext, onError, onCompleted = observableSpy(a:sample(b))
expect(#onError).to.equal(1)
end)
it('errors when the sampler errors', function()
local a = Rx.Observable.fromRange(3)
local b = Rx.Observable.throw()
local onNext, onError, onCompleted = observableSpy(a:sample(b))
expect(#onError).to.equal(1)
end)
end)

View File

@ -41,6 +41,7 @@ local files = {
'src/operators/reduce.lua', 'src/operators/reduce.lua',
'src/operators/reject.lua', 'src/operators/reject.lua',
'src/operators/retry.lua', 'src/operators/retry.lua',
'src/operators/sample.lua',
'src/operators/scan.lua', 'src/operators/scan.lua',
'src/operators/skip.lua', 'src/operators/skip.lua',
'src/operators/skipLast.lua', 'src/operators/skipLast.lua',