The Reduce operator applies a function to the first item emitted by the source Observable and then feeds the result of the function back into the function along with the second item emitted by the source Observable, continuing this process until the source Observable emits its final item and completes, whereupon the Observable returned from Reduce emits the final value returned from the function.
This sort of operation is sometimes called “accumulate,” “aggregate,” “compress,” “fold,” or “inject” in other contexts.
reduce
into iterate reduce
TBD
iterate reduce
collect reduce
The reduce operator returns an Observable that applies a function of your choosing to the first item emitted by a source Observable, then feeds the result of that function along with the second item emitted by the source Observable into the same function, then feeds the result of that function along with the third item into the same function, and so on until all items have been emitted by the source Observable. Then it emits the final result from the final call to your function as the sole output from the returned Observable.
Note that if the source Observable does not emit any items, reduce will fail with an IllegalArgumentException.
IllegalArgumentException
For example, the following code uses reduce to compute, and then emit as an Observable, the sum of the numbers emitted by the source Observable:
numbers = Observable.from([1, 2, 3, 4, 5]); numbers.reduce({ a, b -> a+b }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
15 Sequence complete
reduce does not by default operate on any particular Scheduler.
reduce(Func2)
There is also a version of reduce to which you can pass a seed item in addition to an accumulator function. Note that passing a null seed is not the same as not passing a seed. The behavior will be different. If you pass a seed of null, you will be seeding your reduction with the item null. Note also that if you do pass in a seed, and the source Observable emits no items, reduce will emit the seed and complete normally without error.
null
reduce(R,Func2)
It is a bad idea to use reduce to collect emitted items into a mutable data structure. Instead, use collect for that purpose.
collect
The collect operator is similar to reduce but is specialized for the purpose of collecting the whole set of items emitted by the source Observable into a single mutable data structure to be emitted by the resulting Observable. Pass it two parameters:
collect does not by default operate on any particular Scheduler.
collect(Func0,Action2)
RxJS implements the reduce operator. Pass it an accumulator function, and, optionally, a seed value to pass into the accumulator function with the first item emitted by the source Observable.
var source = Rx.Observable.range(1, 3) .reduce(function (acc, x) { return acc * x; }, 1) var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 6 Completed
reduce is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.aggregates.js
reduce requires one of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
Aggregate
RxPHP implements this operator as reduce.
Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/reduce/reduce.php //Without a seed $source = \Rx\Observable::fromArray(range(1, 3)); $subscription = $source ->reduce(function ($acc, $x) { return $acc + $x; }) ->subscribe($createStdoutObserver());
Next value: 6 Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/reduce/reduce-with-seed.php //With a seed $source = \Rx\Observable::fromArray(range(1, 3)); $subscription = $source ->reduce(function ($acc, $x) { return $acc * $x; }, 1) ->subscribe($createStdoutObserver());
aggregate expand reduce
foldLeft reduce