Fix combineLatest to produce the subscription.

master
xster 2018-05-17 17:13:48 +09:00
parent e56cf8917e
commit 081b2cf55b
3 changed files with 35 additions and 2 deletions

9
rx.lua
View File

@ -488,6 +488,7 @@ function Observable:combineLatest(...)
local latest = {}
local pending = {util.unpack(sources)}
local completed = {}
local subscription = {}
local function onNext(i)
return function(value)
@ -517,8 +518,14 @@ function Observable:combineLatest(...)
end
for i = 1, #sources do
sources[i]:subscribe(onNext(i), onError, onCompleted(i))
subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
end
return Subscription.create(function ()
for i = 1, #subscription do
if subscription[i] then subscription[i]:unsubscribe() end
end
end)
end)
end

View File

@ -21,6 +21,7 @@ function Observable:combineLatest(...)
local latest = {}
local pending = {util.unpack(sources)}
local completed = {}
local subscription = {}
local function onNext(i)
return function(value)
@ -50,7 +51,13 @@ function Observable:combineLatest(...)
end
for i = 1, #sources do
sources[i]:subscribe(onNext(i), onError, onCompleted(i))
subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
end
return Subscription.create(function ()
for i = 1, #subscription do
if subscription[i] then subscription[i]:unsubscribe() end
end
end)
end)
end

View File

@ -4,6 +4,25 @@ describe('combineLatest', function()
expect(observable).to.produce(1, 2, 3, 4, 5)
end)
it('unsubscribes from the combined source observables', function()
local unsubscribeA = spy()
local subscriptionA = Rx.Subscription.create(unsubscribeA)
local observableA = Rx.Observable.create(function(observer)
return subscriptionA
end)
local unsubscribeB = spy()
local subscriptionB = Rx.Subscription.create(unsubscribeB)
local observableB = Rx.Observable.create(function(observer)
return subscriptionB
end)
local subscription = Rx.Observable.combineLatest(observableA, observableB):subscribe()
subscription:unsubscribe()
expect(#unsubscribeA).to.equal(1)
expect(#unsubscribeB).to.equal(1)
end)
it('calls the combinator function with all values produced from all input observables once they have all produced a value', function()
local observableA = Rx.Observable.of('a')
local observableB = Rx.Observable.of('b')