From 081b2cf55b369e8b9661af1fbc251705e2d2078c Mon Sep 17 00:00:00 2001 From: xster Date: Thu, 17 May 2018 17:13:48 +0900 Subject: [PATCH] Fix combineLatest to produce the subscription. --- rx.lua | 9 ++++++++- src/operators/combineLatest.lua | 9 ++++++++- tests/combineLatest.lua | 19 +++++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/rx.lua b/rx.lua index 32bbf8e..90a0cf4 100644 --- a/rx.lua +++ b/rx.lua @@ -488,6 +488,7 @@ function Observable:combineLatest(...) local latest = {} local pending = {util.unpack(sources)} local completed = {} + local subscription = {} local function onNext(i) return function(value) @@ -517,8 +518,14 @@ function Observable:combineLatest(...) end for i = 1, #sources do - sources[i]:subscribe(onNext(i), onError, onCompleted(i)) + subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i)) end + + return Subscription.create(function () + for i = 1, #subscription do + if subscription[i] then subscription[i]:unsubscribe() end + end + end) end) end diff --git a/src/operators/combineLatest.lua b/src/operators/combineLatest.lua index c7f34c5..d0b7ada 100644 --- a/src/operators/combineLatest.lua +++ b/src/operators/combineLatest.lua @@ -21,6 +21,7 @@ function Observable:combineLatest(...) local latest = {} local pending = {util.unpack(sources)} local completed = {} + local subscription = {} local function onNext(i) return function(value) @@ -50,7 +51,13 @@ function Observable:combineLatest(...) end for i = 1, #sources do - sources[i]:subscribe(onNext(i), onError, onCompleted(i)) + subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i)) end + + return Subscription.create(function () + for i = 1, #subscription do + if subscription[i] then subscription[i]:unsubscribe() end + end + end) end) end diff --git a/tests/combineLatest.lua b/tests/combineLatest.lua index 7bcc3ab..7802c08 100644 --- a/tests/combineLatest.lua +++ b/tests/combineLatest.lua @@ -4,6 +4,25 @@ describe('combineLatest', function() expect(observable).to.produce(1, 2, 3, 4, 5) end) + it('unsubscribes from the combined source observables', function() + local unsubscribeA = spy() + local subscriptionA = Rx.Subscription.create(unsubscribeA) + local observableA = Rx.Observable.create(function(observer) + return subscriptionA + end) + + local unsubscribeB = spy() + local subscriptionB = Rx.Subscription.create(unsubscribeB) + local observableB = Rx.Observable.create(function(observer) + return subscriptionB + end) + + local subscription = Rx.Observable.combineLatest(observableA, observableB):subscribe() + subscription:unsubscribe() + expect(#unsubscribeA).to.equal(1) + expect(#unsubscribeB).to.equal(1) + end) + it('calls the combinator function with all values produced from all input observables once they have all produced a value', function() local observableA = Rx.Observable.of('a') local observableB = Rx.Observable.of('b')