commit d64834faa6945f4032f8840e2a2a76cbb87ab79f Author: bjorn Date: Wed Jul 15 00:45:43 2015 -0700 Initial commit; diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..306ae35 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2015 Bjorn Swenson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f43d325 --- /dev/null +++ b/README.md @@ -0,0 +1,41 @@ +RxLua +=== + +How to use: + +```lua +local Rx = require 'rx' + +local observable = Rx.Observable.create(function(observer) + observer.onNext(42) +end) + +observable:subscribe( + function(x) + print('onNext: ' .. x) + end, + function(e) + print('onError: ' .. e) + end, + function() + print('Complete') + end +) +``` + +See examples for more details. + +Combinators +--- + +- `first` +- `last` +- `map` +- `reduce` +- `sum` +- `combineLatest` + +License +--- + +MIT, see `LICENSE` for details. diff --git a/examples/coroutine.lua b/examples/coroutine.lua new file mode 100644 index 0000000..90af928 --- /dev/null +++ b/examples/coroutine.lua @@ -0,0 +1,9 @@ +local Rx = require 'rx' + +-- Observable created from a coroutine that produces two values. +local cr = coroutine.create(function() + coroutine.yield('hello') + return 'world' +end) + +Rx.Observable.fromCoroutine(cr):dump('coroutine') diff --git a/examples/simple.lua b/examples/simple.lua new file mode 100644 index 0000000..29d98fd --- /dev/null +++ b/examples/simple.lua @@ -0,0 +1,4 @@ +local Rx = require 'rx' + +-- Create an observable that produces a single value and print it. +Rx.Observable.fromValue(42):subscribe(print) diff --git a/examples/sum.lua b/examples/sum.lua new file mode 100644 index 0000000..edd2ae2 --- /dev/null +++ b/examples/sum.lua @@ -0,0 +1,13 @@ +local Rx = require 'rx' + +-- Uses the 'sum' combinator to sum all values produced by the observable and +-- dumps the resulting output. +local sumObservable = Rx.Observable.create(function(observer) + observer:onNext(1) + observer:onNext(2) + observer:onNext(3) + observer:onNext(4) + observer:onCompleted() +end) + +sumObservable:sum():dump('sum') diff --git a/rx/init.lua b/rx/init.lua new file mode 100644 index 0000000..79b5e35 --- /dev/null +++ b/rx/init.lua @@ -0,0 +1,8 @@ +local path = (...):gsub('%.init+$', '') + +local Rx = {} + +Rx.Observable = require(path .. '.observable') +Rx.Observer = require(path .. '.observer') + +return Rx diff --git a/rx/observable.lua b/rx/observable.lua new file mode 100644 index 0000000..6082d89 --- /dev/null +++ b/rx/observable.lua @@ -0,0 +1,163 @@ +local path = (...):gsub('%.[^%.]+$', '') +local Observer = require(path .. '.observer') + +local Observable = {} +Observable.__index = Observable + +function Observable.create(subscribe) + local self = { + _subscribe = subscribe + } + + return setmetatable(self, Observable) +end + +function Observable.fromValue(value) + return Observable.create(function(observer) + observer:onNext(value) + observer:onCompleted() + end) +end + +function Observable.fromCoroutine(cr) + return Observable.create(function(observer) + while true do + local success, value = coroutine.resume(cr) + observer:onNext(value) + if coroutine.status(cr) == 'dead' then break end + end + + observer:onCompleted() + end) +end + +function Observable:subscribe(onNext, onError, onComplete) + return self._subscribe(Observer.create(onNext, onError, onComplete)) +end + +function Observable:dump(name) + name = name or '' + + local onNext = function(x) print(name .. ' onNext: ' .. x) end + local onError = function(e) error(name .. ' onError: ' .. e) end + local onCompleted = function() print(name .. ' onCompleted') end + + return self:subscribe(onNext, onError, onCompleted) +end + +-- Combinators + +function Observable:first() + return Observable.create(function(observer) + return self:subscribe(function(x) + observer:onNext(x) + observer:onCompleted() + end, + function(e) + observer:onError(e) + end, + function() + observer:onCompleted() + end) + end) +end + +function Observable:last() + return Observable.create(function(observer) + local value + return self:subscribe(function(x) + value = x + end, + function(e) + observer:onError(e) + end, + function() + observer:onNext(value) + observer:onCompleted() + end) + end) +end + +function Observable:map(fn) + fn = fn or function(x) return x end + return Observable.create(function(observer) + return self:subscribe(function(x) + observer:onNext(fn(x)) + end, + function(e) + observer:onError(e) + end, + function() + observer:onCompleted() + end) + end) +end + +function Observable:reduce(accumulator, seed) + return Observable.create(function(observer) + local currentValue = nil or seed + return self:subscribe(function(x) + currentValue = accumulator(currentValue, x) + end, + function(e) + observer:onError(e) + end, + function() + observer:onNext(currentValue) + observer:onCompleted() + end) + end) +end + +function Observable:sum() + return self:reduce(function(x, y) return x + y end, 0) +end + +function Observable:combineLatest(...) + local values = {} + local done = {} + local targets = {...} + local fn = table.remove(targets) + table.insert(targets, 1, self) + + return Observable.create(function(observer) + local function handleNext(k, v) + values[k] = v + local full = true + for i = 1, #targets do + if not values[i] then full = false break end + end + + if full then + observer:onNext(fn(unpack(values))) + end + end + + local function handleCompleted(k) + done[k] = true + local stop = true + for i = 1, #targets do + if not done[i] then stop = false break end + end + + if stop then + observer:onCompleted() + end + end + + for i = 1, #targets do + targets[i]:subscribe(function(x) + values[i] = x + handleNext(i, x) + end, + function(e) + observer:onError(e) + end, + function() + handleCompleted(i) + end) + end + end) +end + +return Observable diff --git a/rx/observer.lua b/rx/observer.lua new file mode 100644 index 0000000..0ec20b2 --- /dev/null +++ b/rx/observer.lua @@ -0,0 +1,38 @@ +local Observer = {} +Observer.__index = Observer + +local function noop() end +local function defaultError(e) error(e) end + +function Observer.create(onNext, onError, onCompleted) + local self = { + _onNext = onNext or noop, + _onError = onError or defaultError, + _onCompleted = onCompleted or noop, + stopped = false + } + + return setmetatable(self, Observer) +end + +function Observer:onNext(value) + if not self.stopped then + self._onNext(value) + end +end + +function Observer:onError(e) + if not self.stopped then + self.stopped = true + self._onError(e) + end +end + +function Observer:onCompleted() + if not self.stopped then + self.stopped = true + self._onCompleted() + end +end + +return Observer