Merge pull request #41 from StrangePan/issue-40
Fix zip operators not returning a subscription, issue #40master
commit
78363657ed
9
rx.lua
9
rx.lua
|
@ -1813,6 +1813,7 @@ function Observable.zip(...)
|
|||
return Observable.create(function(observer)
|
||||
local values = {}
|
||||
local active = {}
|
||||
local subscriptions = {}
|
||||
for i = 1, count do
|
||||
values[i] = {n = 0}
|
||||
active[i] = true
|
||||
|
@ -1858,8 +1859,14 @@ function Observable.zip(...)
|
|||
end
|
||||
|
||||
for i = 1, count do
|
||||
sources[i]:subscribe(onNext(i), onError, onCompleted(i))
|
||||
subscriptions[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
|
||||
end
|
||||
|
||||
return Subscription.create(function()
|
||||
for i = 1, count do
|
||||
if subscriptions[i] then subscriptions[i]:unsubscribe() end
|
||||
end
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ function Observable.zip(...)
|
|||
return Observable.create(function(observer)
|
||||
local values = {}
|
||||
local active = {}
|
||||
local subscriptions = {}
|
||||
for i = 1, count do
|
||||
values[i] = {n = 0}
|
||||
active[i] = true
|
||||
|
@ -60,7 +61,13 @@ function Observable.zip(...)
|
|||
end
|
||||
|
||||
for i = 1, count do
|
||||
sources[i]:subscribe(onNext(i), onError, onCompleted(i))
|
||||
subscriptions[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
|
||||
end
|
||||
|
||||
return Subscription.create(function()
|
||||
for i = 1, count do
|
||||
if subscriptions[i] then subscriptions[i]:unsubscribe() end
|
||||
end
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
|
|
@ -3,6 +3,25 @@ describe('zip', function()
|
|||
expect(Rx.Observable.fromRange(1, 5):zip()).to.produce(1, 2, 3, 4, 5)
|
||||
end)
|
||||
|
||||
it('unsubscribes from all input observables', function()
|
||||
local unsubscribeA = spy()
|
||||
local subscriptionA = Rx.Subscription.create(unsubscribeA)
|
||||
local observableA = Rx.Observable.create(function(observer)
|
||||
return subscriptionA
|
||||
end)
|
||||
|
||||
local unsubscribeB = spy()
|
||||
local subscriptionB = Rx.Subscription.create(unsubscribeB)
|
||||
local observableB = Rx.Observable.create(function(observer)
|
||||
return subscriptionB
|
||||
end)
|
||||
|
||||
local subscription = Rx.Observable.zip(observableA, observableB):subscribe()
|
||||
subscription:unsubscribe()
|
||||
expect(#unsubscribeA).to.equal(1)
|
||||
expect(#unsubscribeB).to.equal(1)
|
||||
end)
|
||||
|
||||
it('groups values produced by the sources by their index', function()
|
||||
local observableA = Rx.Observable.fromRange(1, 3)
|
||||
local observableB = Rx.Observable.fromRange(2, 4)
|
||||
|
|
Loading…
Reference in New Issue