From ff284ad215ce338c7018fde437eba3f4220c5da2 Mon Sep 17 00:00:00 2001 From: Junseong Jang Date: Fri, 31 Aug 2018 12:17:50 +0900 Subject: [PATCH] Fix merge operator to produce a subscription. --- rx.lua | 10 +++++++++- src/operators/merge.lua | 10 +++++++++- tests/merge.lua | 16 ++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/rx.lua b/rx.lua index 90a0cf4..98436bc 100644 --- a/rx.lua +++ b/rx.lua @@ -1049,6 +1049,8 @@ function Observable:merge(...) table.insert(sources, 1, self) return Observable.create(function(observer) + local subscriptions = {} + local function onNext(...) return observer:onNext(...) end @@ -1068,8 +1070,14 @@ function Observable:merge(...) end for i = 1, #sources do - sources[i]:subscribe(onNext, onError, onCompleted(i)) + subscriptions[i] = sources[i]:subscribe(onNext, onError, onCompleted(i)) end + + return Subscription.create(function () + for i = 1, #sources do + if subscriptions[i] then subscriptions[i]:unsubscribe() end + end + end) end) end diff --git a/src/operators/merge.lua b/src/operators/merge.lua index fecbc5b..9a0d251 100644 --- a/src/operators/merge.lua +++ b/src/operators/merge.lua @@ -9,6 +9,8 @@ function Observable:merge(...) table.insert(sources, 1, self) return Observable.create(function(observer) + local subscriptions = {} + local function onNext(...) return observer:onNext(...) end @@ -28,7 +30,13 @@ function Observable:merge(...) end for i = 1, #sources do - sources[i]:subscribe(onNext, onError, onCompleted(i)) + subscriptions[i] = sources[i]:subscribe(onNext, onError, onCompleted(i)) end + + return Subscription.create(function () + for i = 1, #sources do + if subscriptions[i] then subscriptions[i]:unsubscribe() end + end + end) end) end diff --git a/tests/merge.lua b/tests/merge.lua index e5630ad..751b28b 100644 --- a/tests/merge.lua +++ b/tests/merge.lua @@ -4,6 +4,22 @@ describe('merge', function() expect(observable).to.produce(1, 2, 3, 4, 5) end) + it('unsubscribes from all input observables', function() + local observableA = Rx.Observable.create(function(observer) + return + end) + + local unsubscribeB = spy() + local subscriptionB = Rx.Subscription.create(unsubscribeB) + local observableB = Rx.Observable.create(function(observer) + return subscriptionB + end) + + local subscription = observableA:merge(observableB):subscribe() + subscription:unsubscribe() + expect(#unsubscribeB).to.equal(1) + end) + it('produces values from all input observables, in order', function() local observableA = Rx.Subject.create() local observableB = Rx.Subject.create()