RxJS implements the takeUntil
operator. You can pass it either an Observable or a
Promise
that it will monitor for an item that triggers takeUntil
to stop
mirroring the source Observable.
Sample Code
var source = Rx.Observable.timer(0, 1000)
.takeUntil(Rx.Observable.timer(5000));
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed
takeUntil
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
There is also a takeUntilWithTime
operator to which you can pass an absolute time or an
initial duration, but this is described on the Take
operator page.