AsyncSubject;

This commit is contained in:
bjorn 2015-11-10 20:19:01 -08:00
parent e4cad53294
commit dfcaaabe5d
9 changed files with 409 additions and 5 deletions

View File

@ -71,6 +71,7 @@ RxLua
- [unwrap](#unwrap)
- [window](#windowsize)
- [with](#withsources)
- [zip](#zipsources)
- [ImmediateScheduler](#immediatescheduler)
- [create](#create)
- [schedule](#scheduleaction)
@ -85,6 +86,12 @@ RxLua
- [onNext](#onnextvalues)
- [onError](#onerrormessage)
- [onCompleted](#oncompleted)
- [AsyncSubject](#asyncsubject)
- [create](#create)
- [subscribe](#subscribeonnext-onerror-oncompleted)
- [onNext](#onnextvalues)
- [onError](#onerrormessage)
- [onCompleted](#oncompleted)
- [BehaviorSubject](#behaviorsubject)
- [create](#createvalue)
- [subscribe](#subscribeonnext-onerror-oncompleted)
@ -712,6 +719,16 @@ Returns an Observable that produces values from the original along with the most
|------|------|---------|-------------|
| `sources` | Observable... | | The Observables to include the most recent values from. |
---
#### `.zip(sources)`
Returns an Observable that merges the values produced by the source Observables by grouping them by their index. The first onNext event contains the first value of all of the sources, the second onNext event contains the second value of all of the sources, and so on. onNext is called a number of times equal to the number of values produced by the Observable that produces the fewest number of values.
| Name | Type | Default | Description |
|------|------|---------|-------------|
| `sources` | Observable... | | The Observables to zip. |
# ImmediateScheduler
Schedules Observables by running all operations immediately.
@ -821,6 +838,54 @@ Signal to all Observers that an error has occurred.
Signal to all Observers that the Subject will not produce any more values.
# AsyncSubject
AsyncSubjects are subjects that produce either no values or a single value. If multiple values are produced via onNext, only the last one is used. If onError is called, then no value is produced and onError is called on any subscribed Observers. If an Observer subscribes and the AsyncSubject has already terminated, the Observer will immediately receive the value or the error.
---
#### `.create()`
Creates a new AsyncSubject.
---
#### `:subscribe(onNext, onError, onCompleted)`
Creates a new Observer and attaches it to the AsyncSubject.
| Name | Type | Default | Description |
|------|------|---------|-------------|
| `onNext` | function|table | | A function called when the AsyncSubject produces a value or an existing Observer to attach to the AsyncSubject. |
| `onError` | function | | Called when the AsyncSubject terminates due to an error. |
| `onCompleted` | function | | Called when the AsyncSubject completes normally. |
---
#### `:onNext(values)`
Pushes zero or more values to the AsyncSubject.
| Name | Type | Default | Description |
|------|------|---------|-------------|
| `values` | *... | | |
---
#### `:onError(message)`
Signal to all Observers that an error has occurred.
| Name | Type | Default | Description |
|------|------|---------|-------------|
| `message` | string (optional) | | A string describing what went wrong. |
---
#### `:onCompleted()`
Signal to all Observers that the AsyncSubject will not produce any more values.
# BehaviorSubject
A Subject that tracks its current value. Provides an accessor to retrieve the most recent pushed value, and all subscribers immediately receive the latest value.

113
rx.lua
View File

@ -10,6 +10,9 @@ util.eq = function(x, y) return x == y end
util.noop = function() end
util.identity = function(x) return x end
util.constant = function(x) return function() return x end end
util.isa = function(object, class)
return type(object) == 'table' and getmetatable(object).__index == class
end
--- @class Subscription
-- @description A handle representing the link between an Observer and an Observable, as well as any
@ -1735,7 +1738,7 @@ end
function Subject:subscribe(onNext, onError, onCompleted)
local observer
if type(onNext) == 'table' then
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
@ -1788,6 +1791,103 @@ end
Subject.__call = Subject.onNext
--- @class AsyncSubject
-- @description AsyncSubjects are subjects that produce either no values or a single value. If
-- multiple values are produced via onNext, only the last one is used. If onError is called, then
-- no value is produced and onError is called on any subscribed Observers. If an Observer
-- subscribes and the AsyncSubject has already terminated, the Observer will immediately receive the
-- value or the error.
local AsyncSubject = setmetatable({}, Observable)
AsyncSubject.__index = AsyncSubject
AsyncSubject.__tostring = util.constant('AsyncSubject')
--- Creates a new AsyncSubject.
-- @returns {AsyncSubject}
function AsyncSubject.create()
local self = {
observers = {},
stopped = false,
value = nil,
errorMessage = nil
}
return setmetatable(self, AsyncSubject)
end
--- Creates a new Observer and attaches it to the AsyncSubject.
-- @arg {function|table} onNext|observer - A function called when the AsyncSubject produces a value
-- or an existing Observer to attach to the AsyncSubject.
-- @arg {function} onError - Called when the AsyncSubject terminates due to an error.
-- @arg {function} onCompleted - Called when the AsyncSubject completes normally.
function AsyncSubject:subscribe(onNext, onError, onCompleted)
local observer
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end
if self.value then
observer:onNext(util.unpack(self.value))
observer:onCompleted()
return
elseif self.errorMessage then
observer:onError(self.errorMessage)
return
end
table.insert(self.observers, observer)
return Subscription.create(function()
for i = 1, #self.observers do
if self.observers[i] == observer then
table.remove(self.observers, i)
return
end
end
end)
end
--- Pushes zero or more values to the AsyncSubject.
-- @arg {*...} values
function AsyncSubject:onNext(...)
if not self.stopped then
self.value = util.pack(...)
end
end
--- Signal to all Observers that an error has occurred.
-- @arg {string=} message - A string describing what went wrong.
function AsyncSubject:onError(message)
if not self.stopped then
self.errorMessage = message
for i = 1, #self.observers do
self.observers[i]:onError(self.errorMessage)
end
self.stopped = true
end
end
--- Signal to all Observers that the AsyncSubject will not produce any more values.
function AsyncSubject:onCompleted()
if not self.stopped then
for i = 1, #self.observers do
if self.value then
self.observers[i]:onNext(util.unpack(self.value))
end
self.observers[i]:onCompleted()
end
self.stopped = true
end
end
AsyncSubject.__call = AsyncSubject.onNext
--- @class BehaviorSubject
-- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
-- recent pushed value, and all subscribers immediately receive the latest value.
@ -1817,8 +1917,16 @@ end
-- @arg {function} onError - Called when the Subject terminates due to an error.
-- @arg {function} onCompleted - Called when the Subject completes normally.
function BehaviorSubject:subscribe(onNext, onError, onCompleted)
local observer = Observer.create(onNext, onError, onCompleted)
local observer
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end
Subject.subscribe(self, observer)
if self.value then
observer:onNext(unpack(self.value))
end
@ -1849,5 +1957,6 @@ return {
ImmediateScheduler = ImmediateScheduler,
CooperativeScheduler = CooperativeScheduler,
Subject = Subject,
AsyncSubject = AsyncSubject,
BehaviorSubject = BehaviorSubject
}

View File

@ -0,0 +1,103 @@
local Observable = require 'observable'
local Observer = require 'observer'
local Subscription = require 'subscription'
local util = require 'util'
--- @class AsyncSubject
-- @description AsyncSubjects are subjects that produce either no values or a single value. If
-- multiple values are produced via onNext, only the last one is used. If onError is called, then
-- no value is produced and onError is called on any subscribed Observers. If an Observer
-- subscribes and the AsyncSubject has already terminated, the Observer will immediately receive the
-- value or the error.
local AsyncSubject = setmetatable({}, Observable)
AsyncSubject.__index = AsyncSubject
AsyncSubject.__tostring = util.constant('AsyncSubject')
--- Creates a new AsyncSubject.
-- @returns {AsyncSubject}
function AsyncSubject.create()
local self = {
observers = {},
stopped = false,
value = nil,
errorMessage = nil
}
return setmetatable(self, AsyncSubject)
end
--- Creates a new Observer and attaches it to the AsyncSubject.
-- @arg {function|table} onNext|observer - A function called when the AsyncSubject produces a value
-- or an existing Observer to attach to the AsyncSubject.
-- @arg {function} onError - Called when the AsyncSubject terminates due to an error.
-- @arg {function} onCompleted - Called when the AsyncSubject completes normally.
function AsyncSubject:subscribe(onNext, onError, onCompleted)
local observer
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end
if self.value then
observer:onNext(util.unpack(self.value))
observer:onCompleted()
return
elseif self.errorMessage then
observer:onError(self.errorMessage)
return
end
table.insert(self.observers, observer)
return Subscription.create(function()
for i = 1, #self.observers do
if self.observers[i] == observer then
table.remove(self.observers, i)
return
end
end
end)
end
--- Pushes zero or more values to the AsyncSubject.
-- @arg {*...} values
function AsyncSubject:onNext(...)
if not self.stopped then
self.value = util.pack(...)
end
end
--- Signal to all Observers that an error has occurred.
-- @arg {string=} message - A string describing what went wrong.
function AsyncSubject:onError(message)
if not self.stopped then
self.errorMessage = message
for i = 1, #self.observers do
self.observers[i]:onError(self.errorMessage)
end
self.stopped = true
end
end
--- Signal to all Observers that the AsyncSubject will not produce any more values.
function AsyncSubject:onCompleted()
if not self.stopped then
for i = 1, #self.observers do
if self.value then
self.observers[i]:onNext(util.unpack(self.value))
end
self.observers[i]:onCompleted()
end
self.stopped = true
end
end
AsyncSubject.__call = AsyncSubject.onNext
return AsyncSubject

View File

@ -1,4 +1,5 @@
local Subject = require 'subjects/subject'
local Observer = require 'observer'
local util = require 'util'
--- @class BehaviorSubject
@ -30,8 +31,16 @@ end
-- @arg {function} onError - Called when the Subject terminates due to an error.
-- @arg {function} onCompleted - Called when the Subject completes normally.
function BehaviorSubject:subscribe(onNext, onError, onCompleted)
local observer = Observer.create(onNext, onError, onCompleted)
local observer
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end
Subject.subscribe(self, observer)
if self.value then
observer:onNext(unpack(self.value))
end

View File

@ -1,4 +1,5 @@
local Observable = require 'observable'
local Observer = require 'observer'
local Subscription = require 'subscription'
local util = require 'util'
@ -29,7 +30,7 @@ end
function Subject:subscribe(onNext, onError, onCompleted)
local observer
if type(onNext) == 'table' then
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)

View File

@ -6,5 +6,8 @@ util.eq = function(x, y) return x == y end
util.noop = function() end
util.identity = function(x) return x end
util.constant = function(x) return function() return x end end
util.isa = function(object, class)
return type(object) == 'table' and getmetatable(object).__index == class
end
return util

111
tests/asyncsubject.lua Normal file
View File

@ -0,0 +1,111 @@
describe('AsyncSubject', function()
describe('create', function()
it('returns an AsyncSubject', function()
expect(Rx.AsyncSubject.create()).to.be.an(Rx.AsyncSubject)
end)
it('does not fail if passed arguments', function()
expect(Rx.AsyncSubject.create(1, 2, 3)).to.be.an(Rx.AsyncSubject)
end)
end)
describe('subscribe', function()
describe('if the AsyncSubject has already completed', function()
it('calls onNext if the AsyncSubject has a value and calls onCompleted', function()
local subject = Rx.AsyncSubject.create()
subject:onNext(5)
subject:onCompleted()
local onNext, onError, onCompleted = spy(), spy(), spy()
subject:subscribe(onNext, onError, onCompleted)
expect(onNext).to.equal({{5}})
expect(#onError).to.equal(0)
expect(#onCompleted).to.equal(1)
end)
it('calls onError on the Observer if the AsyncSubject has an error', function()
local subject = Rx.AsyncSubject.create()
subject:onError('ohno')
local onNext, onError, onCompleted = spy(), spy(), spy()
subject:subscribe(onNext, onError, onCompleted)
expect(#onNext).to.equal(0)
expect(onError).to.equal({{'ohno'}})
expect(#onCompleted).to.equal(0)
end)
end)
describe('if the AsyncSubject has not completed', function()
it('returns a subscription', function()
expect(Rx.AsyncSubject.create():subscribe()).to.be.an(Rx.Subscription)
end)
end)
end)
describe('onNext', function()
it('does not push values to subscribers', function()
local observer = Rx.Observer.create()
local subject = Rx.AsyncSubject.create()
local function run()
subject:onNext(1)
subject:onNext(2)
subject:onNext(3)
end
expect(#spy(observer, '_onNext', run)).to.equal(0)
end)
end)
describe('onError', function()
it('pushes errors to all subscribers', function()
local observers = {
Rx.Observer.create(nil, function() end),
Rx.Observer.create(nil, function() end)
}
local spies = {
spy(observers[1], '_onError'),
spy(observers[2], '_onError')
}
local subject = Rx.AsyncSubject.create()
subject:subscribe(observers[1])
subject:subscribe(observers[2])
subject:onError('ohno')
expect(spies[1]).to.equal({{'ohno'}})
expect(spies[2]).to.equal({{'ohno'}})
end)
end)
describe('onCompleted', function()
it('pushes the last value to all Observers if one is present then calls onCompleted', function()
local observers = {}
local spies = {}
for i = 1, 2 do
observers[i] = Rx.Observer.create()
spies[i] = {}
spies[i].onNext = spy(observers[i], '_onNext')
spies[i].onError = spy(observers[i], '_onError')
spies[i].onCompleted = spy(observers[i], '_onCompleted')
end
local subject = Rx.AsyncSubject.create()
subject:subscribe(observers[1])
subject:subscribe(observers[2])
subject:onNext(1)
subject:onNext(2)
subject:onCompleted()
for i = 1, 2 do
expect(spies[i].onNext).to.equal({{2}})
expect(#spies[i].onError).to.equal(0)
expect(#spies[i].onCompleted).to.equal(1)
end
end)
end)
end)

View File

@ -57,7 +57,8 @@ else
local files = {
'observer',
'observable',
'subscription'
'subscription',
'asyncsubject'
}
for i, file in ipairs(files) do

View File

@ -59,6 +59,7 @@ local files = {
'src/schedulers/immediatescheduler.lua',
'src/schedulers/cooperativescheduler.lua',
'src/subjects/subject.lua',
'src/subjects/asyncsubject.lua',
'src/subjects/behaviorsubject.lua',
'src/aliases.lua'
}
@ -78,6 +79,7 @@ local footer = [[return {
ImmediateScheduler = ImmediateScheduler,
CooperativeScheduler = CooperativeScheduler,
Subject = Subject,
AsyncSubject = AsyncSubject,
BehaviorSubject = BehaviorSubject
}]]