RxJS implements this operator as combineLatest
. It may take a variable number
of individual Observables (as well as the combining function) as parameters, or a single
Array
of Observables (as well as the combining function).
Sample Code
/* Have staggering intervals */
var source1 = Rx.Observable.interval(100)
.map(function (i) { return 'First: ' + i; });
var source2 = Rx.Observable.interval(150)
.map(function (i) { return 'Second: ' + i; });
// Combine latest of source1 and source2 whenever either gives a value
var source = source1.combineLatest(
source2,
function (s1, s2) { return s1 + ', ' + s2; }
).take(4);
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
Next: First: 0, Second: 0
Next: First: 1, Second: 0
Next: First: 1, Second: 1
Next: First: 2, Second: 1
Completed
RxJS also has a withLatestFrom
operator. It is similar to combineLatest
, but
only emits items when the single source Observable emits an item (not when any of the
Observables that are passed to the operator do, as combineLatest
does).
Sample Code
/* Have staggering intervals */
var source1 = Rx.Observable.interval(140)
.map(function (i) { return 'First: ' + i; });
var source2 = Rx.Observable.interval(50)
.map(function (i) { return 'Second: ' + i; });
// When source1 emits a value, combine it with the latest emission from source2.
var source = source1.withLatestFrom(
source2,
function (s1, s2) { return s1 + ', ' + s2; }
).take(4);
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
Next: First: 0, Second: 1
Next: First: 1, Second: 4
Next: First: 2, Second: 7
Next: First: 3, Second: 10
Completed
These two operators are both available in each of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js