Merge pull request #25 from naxxster/fix-combineLatest-subscription
Fix combineLatest to produce the subscription.
This commit is contained in:
commit
6e43f56e77
9
rx.lua
9
rx.lua
@ -488,6 +488,7 @@ function Observable:combineLatest(...)
|
|||||||
local latest = {}
|
local latest = {}
|
||||||
local pending = {util.unpack(sources)}
|
local pending = {util.unpack(sources)}
|
||||||
local completed = {}
|
local completed = {}
|
||||||
|
local subscription = {}
|
||||||
|
|
||||||
local function onNext(i)
|
local function onNext(i)
|
||||||
return function(value)
|
return function(value)
|
||||||
@ -517,8 +518,14 @@ function Observable:combineLatest(...)
|
|||||||
end
|
end
|
||||||
|
|
||||||
for i = 1, #sources do
|
for i = 1, #sources do
|
||||||
sources[i]:subscribe(onNext(i), onError, onCompleted(i))
|
subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
|
||||||
end
|
end
|
||||||
|
|
||||||
|
return Subscription.create(function ()
|
||||||
|
for i = 1, #subscription do
|
||||||
|
if subscription[i] then subscription[i]:unsubscribe() end
|
||||||
|
end
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@ function Observable:combineLatest(...)
|
|||||||
local latest = {}
|
local latest = {}
|
||||||
local pending = {util.unpack(sources)}
|
local pending = {util.unpack(sources)}
|
||||||
local completed = {}
|
local completed = {}
|
||||||
|
local subscription = {}
|
||||||
|
|
||||||
local function onNext(i)
|
local function onNext(i)
|
||||||
return function(value)
|
return function(value)
|
||||||
@ -50,7 +51,13 @@ function Observable:combineLatest(...)
|
|||||||
end
|
end
|
||||||
|
|
||||||
for i = 1, #sources do
|
for i = 1, #sources do
|
||||||
sources[i]:subscribe(onNext(i), onError, onCompleted(i))
|
subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
|
||||||
|
end
|
||||||
|
|
||||||
|
return Subscription.create(function ()
|
||||||
|
for i = 1, #subscription do
|
||||||
|
if subscription[i] then subscription[i]:unsubscribe() end
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
end)
|
||||||
end
|
end
|
||||||
|
@ -4,6 +4,25 @@ describe('combineLatest', function()
|
|||||||
expect(observable).to.produce(1, 2, 3, 4, 5)
|
expect(observable).to.produce(1, 2, 3, 4, 5)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
it('unsubscribes from the combined source observables', function()
|
||||||
|
local unsubscribeA = spy()
|
||||||
|
local subscriptionA = Rx.Subscription.create(unsubscribeA)
|
||||||
|
local observableA = Rx.Observable.create(function(observer)
|
||||||
|
return subscriptionA
|
||||||
|
end)
|
||||||
|
|
||||||
|
local unsubscribeB = spy()
|
||||||
|
local subscriptionB = Rx.Subscription.create(unsubscribeB)
|
||||||
|
local observableB = Rx.Observable.create(function(observer)
|
||||||
|
return subscriptionB
|
||||||
|
end)
|
||||||
|
|
||||||
|
local subscription = Rx.Observable.combineLatest(observableA, observableB):subscribe()
|
||||||
|
subscription:unsubscribe()
|
||||||
|
expect(#unsubscribeA).to.equal(1)
|
||||||
|
expect(#unsubscribeB).to.equal(1)
|
||||||
|
end)
|
||||||
|
|
||||||
it('calls the combinator function with all values produced from all input observables once they have all produced a value', function()
|
it('calls the combinator function with all values produced from all input observables once they have all produced a value', function()
|
||||||
local observableA = Rx.Observable.of('a')
|
local observableA = Rx.Observable.of('a')
|
||||||
local observableB = Rx.Observable.of('b')
|
local observableB = Rx.Observable.of('b')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user