The Zip
method returns an Observable that applies a function of your choosing to
the combination of items emitted, in sequence, by two (or more) other Observables, with the
results of this function becoming the items emitted by the returned Observable. It applies this
function in strict sequence, so the first item emitted by the new Observable will be the result
of the function applied to the first item emitted by Observable #1 and the first item emitted by
Observable #2; the second item emitted by the new zip-Observable will be the result of the
function applied to the second item emitted by Observable #1 and the second item emitted by
Observable #2; and so forth. It will only emit as many items as the number of items emitted by
the source Observable that emits the fewest items.
TBD
RxGroovy implements this operator as several variants of zip
and also as
zipWith
, an instance function version of the operator.
The last argument to zip
is a function that accepts an item from each of the
Observables being zipped and emits an item to be emitted in response by the Observable
returned from zip
. You can provide the Observables to be zipped together to
zip
either as between two and nine individual parameters, or as a single
parameter: either an Iterable of Observables or an Observable that emits Observables (as
in the illustration above).
odds = Observable.from([1, 3, 5, 7, 9]); evens = Observable.from([2, 4, 6]); Observable.zip(odds, evens, {o, e -> [o, e]}).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[1, 2] [3, 4] [5, 6] Sequence complete
Note that in this example, the resulting Observable completes normally after emitting three
items, which is the number of items emitted by the shorter of the two component Observbles
(evens
, which emits three items).
zip(Iterable<Observable>,FuncN)
zip(Observable<Observable>,FuncN)
zip(Observable,Observable,Func2)
(there are also versions that take up to nine Observables)
The zipWith
instance version of this operator always takes two parameters.
The first parameter may be either a simple Observable, or an iterable (as in the illustration
above).
zipWith(Observable,Func2)
zipWith(Iterable,Func2)
zip
and zipWith
do not by default operate on any particular
Scheduler.
RxJava implements this operator as several variants of zip
and also as
zipWith
, an instance function version of the operator.
The last argument to zip
is a function that accepts an item from each of the
Observables being zipped and emits an item to be emitted in response by the Observable
returned from zip
. You can provide the Observables to be zipped together to
zip
either as between two and nine individual parameters, or as a single
parameter: either an Iterable of Observables or an Observable that emits Observables (as
in the illustration above).
zip(Iterable<Observable>,FuncN)
zip(Observable<Observable>,FuncN)
zip(Observable,Observable,Func2)
(there are also versions that take up to nine Observables)
The zipWith
instance version of this operator always takes two parameters.
The first parameter may be either a simple Observable, or an iterable (as in the illustration
above).
zipWith(Observable,Func2)
zipWith(Iterable,Func2)
zip
and zipWith
do not by default operate on any particular
Scheduler.
RxJS implements this operator as zip
and zipArray
.
zip
accepts a variable number of Observables or Promises as parameters, followed
by a function that accepts one item emitted by each of those Observables or resolved by those
Promises as input and produces a single item to be emitted by the resulting Observable.
/* Using arguments */ var range = Rx.Observable.range(0, 5); var source = Observable.zip( range, range.skip(1), range.skip(2), function (s1, s2, s3) { return s1 + ':' + s2 + ':' + s3; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0:1:2 Next: 1:2:3 Next: 2:3:4 Completed
/* Using promises and Observables */ var range = Rx.Observable.range(0, 5); var source = Observable.zip( RSVP.Promise.resolve(0), RSVP.Promise.resolve(1), Rx.Observable.return(2) function (s1, s2, s3) { return s1 + ':' + s2 + ':' + s3; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0:1:2 Completed
zipArray
accepts a variable number of Observables as parameters and returns an
Observable that emits arrays, each one containing the nth item from each
source Observable.
var range = Rx.Observable.range(0, 5); var source = Rx.Observable.zipArray( range, range.skip(1), range.skip(2) ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [0,1,2] Next: [1,2,3] Next: [2,3,4] Completed
RxJS also implements a similar operator, forkJoin
. There are two varieties of this
operator. The first collects the last element emitted by each of the source Observables into an array and
emits this array as its own sole emitted item. You can either pass a list of Observables to
forkJoin
as individual parameters or as an array of Observables.
var source = Rx.Observable.forkJoin( Rx.Observable.return(42), Rx.Observable.range(0, 10), Rx.Observable.fromArray([1,2,3]), RSVP.Promise.resolve(56) ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [42, 9, 3, 56] Completed
A second variant of forkJoin
exists as a prototype function, and you call it on an instance
of one source Observable, passing it another source Observable as a parameter. As a second parameter,
you pass it a function that combines the final item emitted by the two source Observables into the sole
item to be emitted by the resulting Observable.
var source1 = Rx.Observable.return(42); var source2 = Rx.Observable.range(0, 3); var source = source1.forkJoin(source2, function (s1, s2) { return s1 + s2; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 44 Completed
forkJoin
is found in the following distributions:
rx.all.js
rx.all.compat.js
rx.experimental.js
(requires rx.js
, rx.compat.js
, rx.lite.js
, or rx.lite.compat.js
)TBD
RxPHP implements this operator as zip
.
Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index. If the result selector function is omitted, a list with the elements of the observable sequences at corresponding indexes will be yielded.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip.php //Without a result selector $range = \Rx\Observable::fromArray(range(0, 4)); $source = $range ->zip([ $range->skip(1), $range->skip(2) ]); $observer = $createStdoutObserver(); $subscription = $source ->subscribe(new CallbackObserver( function ($array) use ($observer) { $observer->onNext(json_encode($array)); }, [$observer, 'onError'], [$observer, 'onCompleted'] ));
Next value: [0,1,2] Next value: [1,2,3] Next value: [2,3,4] Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip-result-selector.php //With a result selector $range = \Rx\Observable::fromArray(range(0, 4)); $source = $range ->zip([ $range->skip(1), $range->skip(2) ], function ($s1, $s2, $s3) { return $s1 . ':' . $s2 . ':' . $s3; }); $observer = $createStdoutObserver(); $subscription = $source->subscribe($createStdoutObserver());
Next value: 0:1:2 Next value: 1:2:3 Next value: 2:3:4 Complete!
RxPHP also has an operator forkJoin
.
Runs all observable sequences in parallel and collect their last elements.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/forkJoin/forkJoin.php use Rx\Observable; $obs1 = Observable::range(1, 4); $obs2 = Observable::range(3, 5); $obs3 = Observable::fromArray(['a', 'b', 'c']); $observable = Observable::forkJoin([$obs1, $obs2, $obs3], function($v1, $v2, $v3) { return $v1 . $v2 . $v3; }); $observable->subscribe($stdoutObserver);
Next value: 47c Complete!
TBD
TBD