diff --git a/doc/README.md b/doc/README.md index e56f898..c964bf4 100644 --- a/doc/README.md +++ b/doc/README.md @@ -286,6 +286,16 @@ Returns: --- +#### `:pack()` + +Returns an Observable that produces the values of the original inside tables. + +Returns: + +- `Observable` + +--- + #### `:pluck(key)` Returns a new Observable that produces values computed by extracting the given key from the tables produced by the original. @@ -379,6 +389,26 @@ Returns: - `Observable` +--- + +#### `:unpack()` + +Returns an Observable that unpacks the tables produced by the original. + +Returns: + +- `Observable` + +--- + +#### `:unwrap()` + +Returns an Observable that takes any values produced by the original that consist of multiple return values and produces each value individually. + +Returns: + +- `Observable` + # Scheduler Schedulers manage groups of Observables. diff --git a/rx.lua b/rx.lua index 2a0693d..ecdfac7 100644 --- a/rx.lua +++ b/rx.lua @@ -1,5 +1,7 @@ local rx +local pack = table.pack or function(...) return {...} end +local unpack = table.unpack or unpack local function noop() end local function identity(x) return x end @@ -438,6 +440,12 @@ function Observable:merge(...) end) end +--- Returns an Observable that produces the values of the original inside tables. +-- @returns {Observable} +function Observable:pack() + return self:map(pack) +end + --- Returns a new Observable that produces values computed by extracting the given key from the -- tables produced by the original. -- @arg {function} key - The key to extract from the table. @@ -611,6 +619,36 @@ function Observable:takeUntil(other) end) end +--- Returns an Observable that unpacks the tables produced by the original. +-- @returns {Observable} +function Observable:unpack() + return self:map(unpack) +end + +--- Returns an Observable that takes any values produced by the original that consist of multiple +-- return values and produces each value individually. +-- @returns {Observable} +function Observable:unwrap() + return Observable.create(function(observer) + local function onNext(...) + local values = {...} + for i = 1, #values do + observer:onNext(values[i]) + end + end + + local function onError(message) + return observer:onError(message) + end + + local function onComplete() + return observer:onComplete() + end + + return self:subscribe(onNext, onError, onComplete) + end) +end + --- @class Scheduler -- @description Schedulers manage groups of Observables. local Scheduler = {}