Merge pull request #31 from naxxster/fix-with-subscription
Fix with operator subscription to unsubscribe from all sourcesmaster
commit
ebe4eb8ea6
10
rx.lua
10
rx.lua
|
@ -1718,6 +1718,7 @@ function Observable:with(...)
|
|||
|
||||
return Observable.create(function(observer)
|
||||
local latest = setmetatable({}, {__len = util.constant(#sources)})
|
||||
local subscriptions = {}
|
||||
|
||||
local function setLatest(i)
|
||||
return function(value)
|
||||
|
@ -1738,10 +1739,15 @@ function Observable:with(...)
|
|||
end
|
||||
|
||||
for i = 1, #sources do
|
||||
sources[i]:subscribe(setLatest(i), util.noop, util.noop)
|
||||
subscriptions[i] = sources[i]:subscribe(setLatest(i), util.noop, util.noop)
|
||||
end
|
||||
|
||||
return self:subscribe(onNext, onError, onCompleted)
|
||||
subscriptions[#sources + 1] = self:subscribe(onNext, onError, onCompleted)
|
||||
return Subscription.create(function ()
|
||||
for i = 1, #sources + 1 do
|
||||
if subscriptions[i] then subscriptions[i]:unsubscribe() end
|
||||
end
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ function Observable:with(...)
|
|||
|
||||
return Observable.create(function(observer)
|
||||
local latest = setmetatable({}, {__len = util.constant(#sources)})
|
||||
local subscriptions = {}
|
||||
|
||||
local function setLatest(i)
|
||||
return function(value)
|
||||
|
@ -31,9 +32,14 @@ function Observable:with(...)
|
|||
end
|
||||
|
||||
for i = 1, #sources do
|
||||
sources[i]:subscribe(setLatest(i), util.noop, util.noop)
|
||||
subscriptions[i] = sources[i]:subscribe(setLatest(i), util.noop, util.noop)
|
||||
end
|
||||
|
||||
return self:subscribe(onNext, onError, onCompleted)
|
||||
subscriptions[#sources + 1] = self:subscribe(onNext, onError, onCompleted)
|
||||
return Subscription.create(function ()
|
||||
for i = 1, #sources + 1 do
|
||||
if subscriptions[i] then subscriptions[i]:unsubscribe() end
|
||||
end
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
|
|
@ -32,4 +32,21 @@ describe('with', function()
|
|||
subjectA:onCompleted()
|
||||
expect(#onCompleted).to.equal(1)
|
||||
end)
|
||||
|
||||
it('should unsubscribe from all source observables', function()
|
||||
local unsubscribeA = spy()
|
||||
local observableA = Rx.Observable.create(function(observer)
|
||||
return Rx.Subscription.create(unsubscribeA)
|
||||
end)
|
||||
|
||||
local unsubscribeB = spy()
|
||||
local observableB = Rx.Observable.create(function(observer)
|
||||
return Rx.Subscription.create(unsubscribeB)
|
||||
end)
|
||||
|
||||
local subscription = observableA:with(observableB):subscribe()
|
||||
subscription:unsubscribe()
|
||||
expect(#unsubscribeA).to.equal(1)
|
||||
expect(#unsubscribeB).to.equal(1)
|
||||
end)
|
||||
end)
|
||||
|
|
Loading…
Reference in New Issue