From 72a7b98cc360eac05efed1472e5ed920600a5e2c Mon Sep 17 00:00:00 2001 From: bjorn Date: Sat, 9 Jan 2016 16:07:23 -0800 Subject: [PATCH] Observable.sample; --- doc/README.md | 11 ++++++++ rx.lua | 36 +++++++++++++++++++++++++ src/operators/sample.lua | 38 ++++++++++++++++++++++++++ tests/observable.lua | 1 + tests/sample.lua | 58 ++++++++++++++++++++++++++++++++++++++++ tools/build.lua | 1 + 6 files changed, 145 insertions(+) create mode 100644 src/operators/sample.lua create mode 100644 tests/sample.lua diff --git a/doc/README.md b/doc/README.md index 30c0d47..1966afb 100644 --- a/doc/README.md +++ b/doc/README.md @@ -55,6 +55,7 @@ RxLua - [reduce](#reduceaccumulator-seed) - [reject](#rejectpredicate) - [retry](#retrycount) + - [sample](#samplesampler) - [scan](#scanaccumulator-seed) - [skip](#skipn) - [skipLast](#skiplastcount) @@ -583,6 +584,16 @@ Returns an Observable that restarts in the event of an error. --- +#### `:sample(sampler)` + +Returns a new Observable that produces its most recent value every time the specified observable produces a value. + +| Name | Type | Default | Description | +|------|------|---------|-------------| +| `sampler` | Observable | | The Observable that is used to sample values from this Observable. | + +--- + #### `:scan(accumulator, seed)` Returns a new Observable that produces values computed by accumulating the results of running a function on each value produced by the original Observable. diff --git a/rx.lua b/rx.lua index 9f9f665..cb8f46d 100644 --- a/rx.lua +++ b/rx.lua @@ -1190,6 +1190,42 @@ function Observable:retry(count) end) end +--- Returns a new Observable that produces its most recent value every time the specified observable +-- produces a value. +-- @arg {Observable} sampler - The Observable that is used to sample values from this Observable. +-- @returns {Observable} +function Observable:sample(sampler) + if not sampler then error('Expected an Observable') end + + return Observable.create(function(observer) + local latest = {} + + local function setLatest(...) + latest = util.pack(...) + end + + local function onNext() + return observer:onNext(util.unpack(latest)) + end + + local function onError(message) + return observer:onError(message) + end + + local function onCompleted() + return observer:onCompleted() + end + + local sourceSubscription = self:subscribe(setLatest, onError) + local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted) + + return Subscription.create(function() + if sourceSubscription then sourceSubscription:unsubscribe() end + if sampleSubscription then sampleSubscription:unsubscribe() end + end) + end) +end + --- Returns a new Observable that produces values computed by accumulating the results of running a -- function on each value produced by the original Observable. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed diff --git a/src/operators/sample.lua b/src/operators/sample.lua new file mode 100644 index 0000000..4ea39c0 --- /dev/null +++ b/src/operators/sample.lua @@ -0,0 +1,38 @@ +local Observable = require 'observable' +local util = require 'util' + +--- Returns a new Observable that produces its most recent value every time the specified observable +-- produces a value. +-- @arg {Observable} sampler - The Observable that is used to sample values from this Observable. +-- @returns {Observable} +function Observable:sample(sampler) + if not sampler then error('Expected an Observable') end + + return Observable.create(function(observer) + local latest = {} + + local function setLatest(...) + latest = util.pack(...) + end + + local function onNext() + return observer:onNext(util.unpack(latest)) + end + + local function onError(message) + return observer:onError(message) + end + + local function onCompleted() + return observer:onCompleted() + end + + local sourceSubscription = self:subscribe(setLatest, onError) + local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted) + + return Subscription.create(function() + if sourceSubscription then sourceSubscription:unsubscribe() end + if sampleSubscription then sampleSubscription:unsubscribe() end + end) + end) +end diff --git a/tests/observable.lua b/tests/observable.lua index 49e4a83..7e17c43 100644 --- a/tests/observable.lua +++ b/tests/observable.lua @@ -260,6 +260,7 @@ describe('Observable', function() dofile('tests/reduce.lua') dofile('tests/reject.lua') dofile('tests/retry.lua') + dofile('tests/sample.lua') dofile('tests/scan.lua') dofile('tests/skip.lua') dofile('tests/skipLast.lua') diff --git a/tests/sample.lua b/tests/sample.lua new file mode 100644 index 0000000..69c9e49 --- /dev/null +++ b/tests/sample.lua @@ -0,0 +1,58 @@ +describe('sample', function() + it('errors if no sampler is specified', function() + expect(function() Rx.Observable.empty():sample() end).to.fail() + end) + + it('produces nil values if the sampler fires before the source does', function() + local sampler = Rx.Observable.fromRange(3) + local onNext, onError, onCompleted = observableSpy(Rx.Observable.empty():sample(sampler)) + expect(#onNext).to.equal(3) + expect(next(onNext[1])).to_not.exist() + expect(next(onNext[2])).to_not.exist() + expect(next(onNext[3])).to_not.exist() + end) + + it('produces the latest value produced by the source when the sampler fires', function() + local a = Rx.Subject.create() + local b = Rx.Subject.create() + local onNext, onError, onCompleted = observableSpy(a:sample(b)) + a:onNext(1) + b:onNext('a') + b:onNext('b') + a:onNext(2) + a:onNext(3) + b:onNext('c') + a:onCompleted() + b:onCompleted() + expect(onNext).to.equal({{1}, {1}, {3}}) + end) + + it('completes when the sampler completes', function() + local a = Rx.Subject.create() + local b = Rx.Subject.create() + local onNext, onError, onCompleted = observableSpy(a:sample(b)) + a:onNext(1) + a:onCompleted() + b:onNext('a') + b:onNext('b') + b:onNext('c') + expect(#onCompleted).to.equal(0) + b:onCompleted() + expect(#onCompleted).to.equal(1) + expect(onNext).to.equal({{1}, {1}, {1}}) + end) + + it('errors when the source errors', function() + local a = Rx.Observable.throw() + local b = Rx.Observable.fromRange(3) + local onNext, onError, onCompleted = observableSpy(a:sample(b)) + expect(#onError).to.equal(1) + end) + + it('errors when the sampler errors', function() + local a = Rx.Observable.fromRange(3) + local b = Rx.Observable.throw() + local onNext, onError, onCompleted = observableSpy(a:sample(b)) + expect(#onError).to.equal(1) + end) +end) diff --git a/tools/build.lua b/tools/build.lua index 07710a7..2c1466b 100644 --- a/tools/build.lua +++ b/tools/build.lua @@ -41,6 +41,7 @@ local files = { 'src/operators/reduce.lua', 'src/operators/reject.lua', 'src/operators/retry.lua', + 'src/operators/sample.lua', 'src/operators/scan.lua', 'src/operators/skip.lua', 'src/operators/skipLast.lua',