Observable.switch;

This commit is contained in:
bjorn 2015-11-07 15:42:16 -08:00
parent b01c57ff5f
commit 5a27a2e8f0
5 changed files with 110 additions and 0 deletions

View File

@ -61,6 +61,7 @@ RxLua
- [skipLast](#skiplastcount)
- [startWith](#startwithvalues)
- [sum](#sum)
- [switch](#switch)
- [take](#taken)
- [takeUntil](#takeuntilother)
- [takeWhile](#takewhilepredicate)
@ -622,6 +623,12 @@ Returns an Observable that produces a single value representing the sum of the v
---
#### `:switch()`
Given an Observable that produces Observables, returns an Observable that produces the values produced by the most recently produced Observable.
---
#### `:take(n)`
Returns a new Observable that only produces the first n results of the original.

31
rx.lua
View File

@ -1269,6 +1269,37 @@ function Observable:sum()
return self:reduce(function(x, y) return x + y end, 0)
end
--- Given an Observable that produces Observables, returns an Observable that produces the values
-- produced by the most recently produced Observable.
-- @returns {Observable}
function Observable:switch()
return Observable.create(function(observer)
local subscription
local function onNext(...)
return observer:onNext(...)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
local function switch(source)
if subscription then
subscription:unsubscribe()
end
subscription = source:subscribe(onNext, onError, nil)
end
return self:subscribe(switch, onError, onCompleted)
end)
end
--- Returns a new Observable that only produces the first n results of the original.
-- @arg {number=1} n - The number of elements to produce before completing.
-- @returns {Observable}

32
src/operators/switch.lua Normal file
View File

@ -0,0 +1,32 @@
local Observable = require 'observable'
--- Given an Observable that produces Observables, returns an Observable that produces the values
-- produced by the most recently produced Observable.
-- @returns {Observable}
function Observable:switch()
return Observable.create(function(observer)
local subscription
local function onNext(...)
return observer:onNext(...)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
local function switch(source)
if subscription then
subscription:unsubscribe()
end
subscription = source:subscribe(onNext, onError, nil)
end
return self:subscribe(switch, onError, onCompleted)
end)
end

39
tests/switch.lua Normal file
View File

@ -0,0 +1,39 @@
describe('switch', function()
it('errors when the source errors', function()
expect(Rx.Observable.throw():switch().subscribe).to.fail()
end)
it('errors when an Observable produced by the source errors', function()
local observable = Rx.Observable.create(function(observer)
observer:onNext(Rx.Observable.throw())
observer:onCompleted()
end)
expect(observable:switch().subscribe).to.fail()
end)
it('produces the values produced by the latest Observable produced by the source', function()
local a = Rx.Subject.create()
local b = Rx.Subject.create()
local c = Rx.Subject.create()
local onNext, onError, onCompleted = observableSpy(a:switch())
b:onNext(1)
a:onNext(b)
b:onNext(2)
b:onNext(3)
c:onNext(7)
a:onNext(c)
b:onNext(4)
c:onNext(8)
b:onCompleted()
c:onNext(9)
c:onCompleted()
a:onCompleted()
expect(onNext).to.equal({{2}, {3}, {8}, {9}})
expect(#onError).to.equal(0)
expect(#onCompleted).to.equal(1)
end)
end)

View File

@ -45,6 +45,7 @@ local files = {
'src/operators/skipLast.lua',
'src/operators/startWith.lua',
'src/operators/sum.lua',
'src/operators/switch.lua',
'src/operators/take.lua',
'src/operators/takeUntil.lua',
'src/operators/takeWhile.lua',