The Scan operator applies a function to the first item emitted by the source Observable and then emits the result of that function as its own first emission. It also feeds the result of the function back into the function along with the second item emitted by the source Observable in order to generate its second emission. It continues to feed back its own subsequent emissions along with the subsequent emissions from the source Observable in order to create the rest of its sequence.
This sort of operator is sometimes called an “accumulator” in other contexts.
scan
reductions
TBD
RxGroovy implements this operator as scan. The following code, for example, takes an Observable that emits a consecutive sequence of n integers starting with 1 and converts it, via scan, into an Observable that emits the first n triangular numbers:
1
numbers = Observable.from([1, 2, 3, 4, 5]); numbers.scan({ a, b -> a+b }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 3 6 10 15 Sequence complete
scan(Func2)
There is also a variant of scan to which you can pass a seed value to pass to the accumulator function the first time it is called (for the first emission from the source Observable) in place of the result from the missing prior call to the accumulator. Note that if you use this version, scan will emit this seed value as its own initial emission. Note also that passing a seed of null is not the same as passing no seed at all. A null seed is a valid variety of seed.
null
scan(R,Func2)
This operator does not by default operate on any particular Scheduler.
RxJava implements this operator as scan.
Observable.just(1, 2, 3, 4, 5) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer sum, Integer item) { return sum + item; } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 3 Next: 6 Next: 10 Next: 15 Sequence complete.
expand scan
RxJS implements the scan operator.
var source = Rx.Observable.range(1, 3) .scan( function (acc, x) { return acc + x; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 3 Next: 6 Completed
You can optionally pass scan a seed value as an additional parameter. scan will pass this seed value to the accumulator function the first time it is called (for the first emission from the source Observable) in place of the result from the missing prior call to the accumulator.
var source = Rx.Observable.range(1, 3) .scan( function (acc, x) { return acc * x; }, 1 ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 6 Completed
scan is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxJS also implements the expand operator, which is somewhat similar. Rather than applying the function to the previous return value of the function combined with the next item emitted from the source Observable, such that the number of items it emits is equal to the number emitted by the source Observable, expand simply feeds the return value from the function back into the function without regard to future emissions from the Observable, such that it will just continue to create new values at its own pace.
expand
var source = Rx.Observable.return(42) .expand(function (x) { return Rx.Observable.return(42 + x); }) .take(5); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Next: 84 Next: 126 Next: 168 Next: 210 Completed
expand is found in each of the following distributions:
rx.experimental.js
expand requires one of the following distributions:
Scan
RxPHP implements this operator as scan.
Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/scan/scan.php //Without a seed $source = Rx\Observable::range(1, 3); $subscription = $source ->scan(function ($acc, $x) { return $acc + $x; }) ->subscribe($createStdoutObserver());
Next value: 1 Next value: 3 Next value: 6 Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/scan/scan-with-seed.php //With a seed $source = Rx\Observable::range(1, 3); $subscription = $source ->scan(function ($acc, $x) { return $acc * $x; }, 1) ->subscribe($createStdoutObserver());
Next value: 1 Next value: 2 Next value: 6 Complete!