diff --git a/doc/README.md b/doc/README.md index 68262b0..fae4b01 100644 --- a/doc/README.md +++ b/doc/README.md @@ -18,7 +18,7 @@ RxLua - [of](#ofvalues) - [fromRange](#fromrangeinitial-limit-step) - [fromTable](#fromtabletable-iterator-keys) - - [fromCoroutine](#fromcoroutinecoroutine) + - [fromCoroutine](#fromcoroutinefn) - [fromFileByLine](#fromfilebylinefilename) - [defer](#deferfactory) - [replicate](#replicatevalue-count) @@ -250,13 +250,13 @@ Creates an Observable that produces values from a table. --- -#### `.fromCoroutine(coroutine)` +#### `.fromCoroutine(fn)` Creates an Observable that produces values when the specified coroutine yields. | Name | Type | Default | Description | |------|------|---------|-------------| -| `coroutine` | thread | | | +| `fn` | thread|function | | A coroutine or function to use to generate values. Note that if a coroutine is used, the values it yields will be shared by all subscribed Observers (influenced by the Scheduler), whereas a new coroutine will be created for each Observer when a function is used. | --- diff --git a/rx.lua b/rx.lua index 633113d..941f009 100644 --- a/rx.lua +++ b/rx.lua @@ -197,11 +197,14 @@ function Observable.fromTable(t, iterator, keys) end --- Creates an Observable that produces values when the specified coroutine yields. --- @arg {thread} coroutine +-- @arg {thread|function} fn - A coroutine or function to use to generate values. Note that if a +-- coroutine is used, the values it yields will be shared by all +-- subscribed Observers (influenced by the Scheduler), whereas a new +-- coroutine will be created for each Observer when a function is used. -- @returns {Observable} -function Observable.fromCoroutine(thread, scheduler) - thread = type(thread) == 'function' and coroutine.create(thread) or thread +function Observable.fromCoroutine(fn, scheduler) return Observable.create(function(observer) + local thread = type(fn) == 'function' and coroutine.create(fn) or fn return scheduler:schedule(function() while not observer.stopped do local success, value = coroutine.resume(thread) diff --git a/src/observable.lua b/src/observable.lua index bd0b4da..7b7d27f 100644 --- a/src/observable.lua +++ b/src/observable.lua @@ -102,11 +102,14 @@ function Observable.fromTable(t, iterator, keys) end --- Creates an Observable that produces values when the specified coroutine yields. --- @arg {thread} coroutine +-- @arg {thread|function} fn - A coroutine or function to use to generate values. Note that if a +-- coroutine is used, the values it yields will be shared by all +-- subscribed Observers (influenced by the Scheduler), whereas a new +-- coroutine will be created for each Observer when a function is used. -- @returns {Observable} -function Observable.fromCoroutine(thread, scheduler) - thread = type(thread) == 'function' and coroutine.create(thread) or thread +function Observable.fromCoroutine(fn, scheduler) return Observable.create(function(observer) + local thread = type(fn) == 'function' and coroutine.create(fn) or fn return scheduler:schedule(function() while not observer.stopped do local success, value = coroutine.resume(thread) diff --git a/tests/observable.lua b/tests/observable.lua index 7cfcedf..750512f 100644 --- a/tests/observable.lua +++ b/tests/observable.lua @@ -177,6 +177,40 @@ describe('Observable', function() until Rx.scheduler:isEmpty() expect(onNext).to.equal({{1}, {2}, {3}}) end) + + it('shares values among Observers when the first argument is a coroutine', function() + local coroutine = coroutine.create(function() + coroutine.yield(1) + coroutine.yield(2) + return 3 + end) + + Rx.scheduler = Rx.CooperativeScheduler.create() + local observable = Rx.Observable.fromCoroutine(coroutine, Rx.scheduler) + local onNextA = observableSpy(observable) + local onNextB = observableSpy(observable) + repeat Rx.scheduler:update() + until Rx.scheduler:isEmpty() + expect(onNextA).to.equal({{2}}) + expect(onNextB).to.equal({{1}, {3}}) + end) + + it('uses a unique coroutine for each Observer when the first argument is a function', function() + local coroutine = function() + coroutine.yield(1) + coroutine.yield(2) + return 3 + end + + Rx.scheduler = Rx.CooperativeScheduler.create() + local observable = Rx.Observable.fromCoroutine(coroutine, Rx.scheduler) + local onNextA = observableSpy(observable) + local onNextB = observableSpy(observable) + repeat Rx.scheduler:update() + until Rx.scheduler:isEmpty() + expect(onNextA).to.equal({{1}, {2}, {3}}) + expect(onNextB).to.equal({{1}, {2}, {3}}) + end) end) describe('defer', function()