TakeUntil

discard any items emitted by an Observable after a second Observable emits an item

The TakeUntil subscribes and begins mirroring the source Observable. It also monitors a second Observable that you provide. If this second Observable emits an item or sends a termination notification, the Observable returned by TakeUntil stops mirroring the source Observable and terminates.

In some ReactiveX implementations, TakeUntil will not stop mirroring the source Observable if the second Observable terminates. This is unfortunate and confusing, but something we have to live with.

See Also

Language-Specific Information:

takeUntil

In RxGroovy, this operator is implemented as takeUntil. Note that the second Observable can cause takeUntil to quit emitting items either by emitting an item or by issuing an onError or onCompleted notification.

takeUntil does not by default operate on any particular Scheduler.

takeUntil

A second version of this operator was released in RxGroovy 1.1. It uses a predicate function that evaluates the items emitted by the source Observable, rather than a second Observable, to terminate the resulting Observable sequence. In this way, it behaves in a similar way to TakeWhile.

takeUntil

In RxJava, this operator is implemented as takeUntil. Note that the second Observable can cause takeUntil to quit emitting items either by emitting an item or by issuing an onError or onCompleted notification.

takeUntil does not by default operate on any particular Scheduler.

takeUntil

A second version of this operator was released in RxJava 1.1. It uses a predicate function that evaluates the items emitted by the source Observable, rather than a second Observable, to terminate the resulting Observable sequence. In this way, it behaves in a similar way to TakeWhile.

takeUntil

RxJS implements the takeUntil operator. You can pass it either an Observable or a Promise that it will monitor for an item that triggers takeUntil to stop mirroring the source Observable.

Sample Code

var source = Rx.Observable.timer(0, 1000)
    .takeUntil(Rx.Observable.timer(5000));

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed

takeUntil is found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

There is also a takeUntilWithTime operator to which you can pass an absolute time or an initial duration, but this is described on the Take operator page.

RxPHP implements this operator as takeUntil.

Returns the values from the source observable sequence until the other observable sequence produces a value.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/take.php

$observable = Rx\Observable::fromArray([21, 42, 63]);
$observable
    ->take(2)
    ->subscribe($stdoutObserver);

   
Next value: 21
Next value: 42
Complete!