Merge pull request #16 from bjornbytes/fix-coroutines

Avoid shared coroutines in Observable.fromCoroutine;
This commit is contained in:
Bjorn Swenson 2017-03-21 21:18:07 -07:00 committed by GitHub
commit 2a0bd62725
4 changed files with 49 additions and 9 deletions

View File

@ -18,7 +18,7 @@ RxLua
- [of](#ofvalues)
- [fromRange](#fromrangeinitial-limit-step)
- [fromTable](#fromtabletable-iterator-keys)
- [fromCoroutine](#fromcoroutinecoroutine)
- [fromCoroutine](#fromcoroutinefn)
- [fromFileByLine](#fromfilebylinefilename)
- [defer](#deferfactory)
- [replicate](#replicatevalue-count)
@ -250,13 +250,13 @@ Creates an Observable that produces values from a table.
---
#### `.fromCoroutine(coroutine)`
#### `.fromCoroutine(fn)`
Creates an Observable that produces values when the specified coroutine yields.
| Name | Type | Default | Description |
|------|------|---------|-------------|
| `coroutine` | thread | | |
| `fn` | thread|function | | A coroutine or function to use to generate values. Note that if a coroutine is used, the values it yields will be shared by all subscribed Observers (influenced by the Scheduler), whereas a new coroutine will be created for each Observer when a function is used. |
---

9
rx.lua
View File

@ -197,11 +197,14 @@ function Observable.fromTable(t, iterator, keys)
end
--- Creates an Observable that produces values when the specified coroutine yields.
-- @arg {thread} coroutine
-- @arg {thread|function} fn - A coroutine or function to use to generate values. Note that if a
-- coroutine is used, the values it yields will be shared by all
-- subscribed Observers (influenced by the Scheduler), whereas a new
-- coroutine will be created for each Observer when a function is used.
-- @returns {Observable}
function Observable.fromCoroutine(thread, scheduler)
thread = type(thread) == 'function' and coroutine.create(thread) or thread
function Observable.fromCoroutine(fn, scheduler)
return Observable.create(function(observer)
local thread = type(fn) == 'function' and coroutine.create(fn) or fn
return scheduler:schedule(function()
while not observer.stopped do
local success, value = coroutine.resume(thread)

View File

@ -102,11 +102,14 @@ function Observable.fromTable(t, iterator, keys)
end
--- Creates an Observable that produces values when the specified coroutine yields.
-- @arg {thread} coroutine
-- @arg {thread|function} fn - A coroutine or function to use to generate values. Note that if a
-- coroutine is used, the values it yields will be shared by all
-- subscribed Observers (influenced by the Scheduler), whereas a new
-- coroutine will be created for each Observer when a function is used.
-- @returns {Observable}
function Observable.fromCoroutine(thread, scheduler)
thread = type(thread) == 'function' and coroutine.create(thread) or thread
function Observable.fromCoroutine(fn, scheduler)
return Observable.create(function(observer)
local thread = type(fn) == 'function' and coroutine.create(fn) or fn
return scheduler:schedule(function()
while not observer.stopped do
local success, value = coroutine.resume(thread)

View File

@ -177,6 +177,40 @@ describe('Observable', function()
until Rx.scheduler:isEmpty()
expect(onNext).to.equal({{1}, {2}, {3}})
end)
it('shares values among Observers when the first argument is a coroutine', function()
local coroutine = coroutine.create(function()
coroutine.yield(1)
coroutine.yield(2)
return 3
end)
Rx.scheduler = Rx.CooperativeScheduler.create()
local observable = Rx.Observable.fromCoroutine(coroutine, Rx.scheduler)
local onNextA = observableSpy(observable)
local onNextB = observableSpy(observable)
repeat Rx.scheduler:update()
until Rx.scheduler:isEmpty()
expect(onNextA).to.equal({{2}})
expect(onNextB).to.equal({{1}, {3}})
end)
it('uses a unique coroutine for each Observer when the first argument is a function', function()
local coroutine = function()
coroutine.yield(1)
coroutine.yield(2)
return 3
end
Rx.scheduler = Rx.CooperativeScheduler.create()
local observable = Rx.Observable.fromCoroutine(coroutine, Rx.scheduler)
local onNextA = observableSpy(observable)
local onNextB = observableSpy(observable)
repeat Rx.scheduler:update()
until Rx.scheduler:isEmpty()
expect(onNextA).to.equal({{1}, {2}, {3}})
expect(onNextB).to.equal({{1}, {2}, {3}})
end)
end)
describe('defer', function()