From 060d0878be5d891b2b51ca66c8c3f13c47e2a818 Mon Sep 17 00:00:00 2001 From: bjorn Date: Tue, 27 Oct 2015 20:57:37 -0700 Subject: [PATCH] Observable.catch; --- doc/README.md | 11 +++++++++++ rx.lua | 38 ++++++++++++++++++++++++++++++++++++++ src/observable.lua | 38 ++++++++++++++++++++++++++++++++++++++ tests/catch.lua | 31 +++++++++++++++++++++++++++++++ tests/observable.lua | 1 + 5 files changed, 119 insertions(+) create mode 100644 tests/catch.lua diff --git a/doc/README.md b/doc/README.md index 63af136..58dad08 100644 --- a/doc/README.md +++ b/doc/README.md @@ -24,6 +24,7 @@ RxLua - [amb](#ambobservables) - [average](#average) - [buffer](#buffersize) + - [catch](#catchhandler) - [combineLatest](#combinelatestobservables-combinator) - [compact](#compact) - [concat](#concatsources) @@ -274,6 +275,16 @@ Returns an Observable that buffers values from the original and produces them as --- +#### `:catch(handler)` + +Returns an Observable that intercepts any errors from the previous and replace them with values produced by a new Observable. + +| Name | Type | Default | Description | +|------|------|---------|-------------| +| `handler` | function|Observable | | An Observable or a function that returns an Observable to replace the source Observable in the event of an error. | + +--- + #### `:combineLatest(observables, combinator)` Returns a new Observable that runs a combinator function on the most recent values from a set of Observables whenever any of them produce a new value. The results of the combinator function are produced by the new Observable. diff --git a/rx.lua b/rx.lua index 3c6572e..6dd2569 100644 --- a/rx.lua +++ b/rx.lua @@ -361,6 +361,44 @@ function Observable:buffer(size) end) end +--- Returns an Observable that intercepts any errors from the previous and replace them with values +-- produced by a new Observable. +-- @arg {function|Observable} handler - An Observable or a function that returns an Observable to +-- replace the source Observable in the event of an error. +-- @returns {Observable} +function Observable:catch(handler) + handler = handler and (type(handler) == 'function' and handler or util.constant(handler)) + + return Observable.create(function(observer) + local subscription + + local function onNext(...) + return observer:onNext(...) + end + + local function onError(e) + if not handler then + return observer:onCompleted() + end + + local continue = handler(e) + if continue then + if subscription then subscription:unsubscribe() end + continue:subscribe(observer) + else + observer:onError(e) + end + end + + local function onCompleted() + observer:onCompleted() + end + + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription + end) +end + --- Returns a new Observable that runs a combinator function on the most recent values from a set -- of Observables whenever any of them produce a new value. The results of the combinator function -- are produced by the new Observable. diff --git a/src/observable.lua b/src/observable.lua index b29391e..eac40f1 100644 --- a/src/observable.lua +++ b/src/observable.lua @@ -276,6 +276,44 @@ function Observable:buffer(size) end) end +--- Returns an Observable that intercepts any errors from the previous and replace them with values +-- produced by a new Observable. +-- @arg {function|Observable} handler - An Observable or a function that returns an Observable to +-- replace the source Observable in the event of an error. +-- @returns {Observable} +function Observable:catch(handler) + handler = handler and (type(handler) == 'function' and handler or util.constant(handler)) + + return Observable.create(function(observer) + local subscription + + local function onNext(...) + return observer:onNext(...) + end + + local function onError(e) + if not handler then + return observer:onCompleted() + end + + local continue = handler(e) + if continue then + if subscription then subscription:unsubscribe() end + continue:subscribe(observer) + else + observer:onError(e) + end + end + + local function onCompleted() + observer:onCompleted() + end + + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription + end) +end + --- Returns a new Observable that runs a combinator function on the most recent values from a set -- of Observables whenever any of them produce a new value. The results of the combinator function -- are produced by the new Observable. diff --git a/tests/catch.lua b/tests/catch.lua new file mode 100644 index 0000000..8b7752e --- /dev/null +++ b/tests/catch.lua @@ -0,0 +1,31 @@ +describe('catch', function() + it('ignores errors if no handler is specified', function() + expect(Rx.Observable.throw():catch()).to.produce.nothing() + end) + + it('continues producing values from the specified observable if the source errors', function() + local handler = Rx.Subject.create() + local observable = Rx.Observable.create(function(observer) + observer:onNext(1) + observer:onNext(2) + observer:onError('ohno') + observer:onNext(3) + observer:onCompleted() + end) + + handler:onNext(5) + + local onNext = observableSpy(observable:catch(handler)) + + handler:onNext(6) + handler:onNext(7) + handler:onCompleted() + + expect(onNext).to.equal({{1}, {2}, {6}, {7}}) + end) + + it('allows a function as an argument', function() + local handler = function() return Rx.Observable.empty() end + expect(Rx.Observable.throw():catch(handler)).to.produce.nothing() + end) +end) diff --git a/tests/observable.lua b/tests/observable.lua index c1e5f33..b403d98 100644 --- a/tests/observable.lua +++ b/tests/observable.lua @@ -181,6 +181,7 @@ describe('Observable', function() dofile('tests/amb.lua') dofile('tests/average.lua') dofile('tests/buffer.lua') + dofile('tests/catch.lua') dofile('tests/combineLatest.lua') dofile('tests/compact.lua') dofile('tests/concat.lua')