diff --git a/rx.lua b/rx.lua index be0af68..09235d7 100644 --- a/rx.lua +++ b/rx.lua @@ -1049,6 +1049,7 @@ function Observable:merge(...) table.insert(sources, 1, self) return Observable.create(function(observer) + local completed = {} local subscriptions = {} local function onNext(...) @@ -1061,9 +1062,9 @@ function Observable:merge(...) local function onCompleted(i) return function() - sources[i] = nil + table.insert(completed, i) - if not next(sources) then + if #completed == #sources then observer:onCompleted() end end diff --git a/src/operators/merge.lua b/src/operators/merge.lua index 9a0d251..d3ffcc5 100644 --- a/src/operators/merge.lua +++ b/src/operators/merge.lua @@ -9,6 +9,7 @@ function Observable:merge(...) table.insert(sources, 1, self) return Observable.create(function(observer) + local completed = {} local subscriptions = {} local function onNext(...) @@ -21,9 +22,9 @@ function Observable:merge(...) local function onCompleted(i) return function() - sources[i] = nil + table.insert(completed, i) - if not next(sources) then + if #completed == #sources then observer:onCompleted() end end diff --git a/tests/merge.lua b/tests/merge.lua index 751b28b..5f19585 100644 --- a/tests/merge.lua +++ b/tests/merge.lua @@ -20,6 +20,20 @@ describe('merge', function() expect(#unsubscribeB).to.equal(1) end) + it('unsubscribes from all input observables included completed', function() + local observableA = Rx.Observable.empty() + + local unsubscribeB = spy() + local subscriptionB = Rx.Subscription.create(unsubscribeB) + local observableB = Rx.Observable.create(function(observer) + return subscriptionB + end) + + local subscription = observableA:merge(Rx.Observable.empty(), observableB):subscribe() + subscription:unsubscribe() + expect(#unsubscribeB).to.equal(1) + end) + it('produces values from all input observables, in order', function() local observableA = Rx.Subject.create() local observableB = Rx.Subject.create()