Do

register an action to take upon a variety of Observable lifecycle events

Do

You can register callbacks that ReactiveX will call when certain events take place on an Observable, where those callbacks will be called independently from the normal set of notifications associated with an Observable cascade. There are a variety of operators that various ReactiveX implementations have designed to allow for this.

See Also

Language-Specific Information:

RxGroovy has several Do variants.

doOnEach

The doOnEach operator allows you to establish a callback that the resulting Observable will call each time it emits an item. You can pass this callback either in the form of an Action that takes an onNext variety of Notification as its sole parameter, or you can pass in an Observer whose onNext method will be called as if it had subscribed to the Observable.

doOnNext

The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.

The doOnRequest operator (new in RxGroovy 1.1) registers an Action which will be called whenever an observer requests additional items from the resulting Observable. That Action receives as its parameter the number of items that the observer is requesting.

doOnSubscribe

The doOnSubscribe operator registers an Action which will be called whenever an observer subscribes to the resulting Observable.

doOnUnsubscribe

The doOnUnsubscribe operator registers an Action which will be called whenever an observer unsubscribes from the resulting Observable.

doOnCompleted

The doOnCompleted operator registers an Action which will be called if the resulting Observable terminates normally, calling onCompleted.

doOnError

The doOnError operator registers an Action which will be called if the resulting Observable terminates abnormally, calling onError. This Action will be passed the Throwable representing the error.

doOnTerminate

The doOnTerminate operator registers an Action which will be called just before the resulting Observable terminates, whether normally or with an error.

finallyDo

The finallyDo operator registers an Action which will be called just after the resulting Observable terminates, whether normally or with an error.

Sample Code

def numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.finallyDo({ println('Finally'); }).subscribe(
   { println(it); },                          // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence complete"); }          // onCompleted
);
1
2
3
4
5
Sequence complete
Finally

RxJava has several Do variants.

doOnEach

The doOnEach operator allows you to establish a callback that the resulting Observable will call each time it emits an item. You can pass this callback either in the form of an Action that takes an onNext variety of Notification as its sole parameter, or you can pass in an Observer whose onNext method will be called as if it had subscribed to the Observable.

doOnNext

The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.

Sample Code

Observable.just(1, 2, 3)
          .doOnNext(new Action1<Integer>() {
          @Override
          public void call(Integer item) {
            if( item > 1 ) {
              throw new RuntimeException( "Item exceeds maximum value" );
            }
          }
        }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Error: Item exceeds maximum value

The doOnRequest operator (new in RxJava 1.1) registers an Action which will be called whenever an observer requests additional items from the resulting Observable. That Action receives as its parameter the number of items that the observer is requesting.

doOnSubscribe

The doOnSubscribe operator registers an Action which will be called whenever an observer subscribes to the resulting Observable.

doOnUnsubscribe

The doOnUnsubscribe operator registers an Action which will be called whenever an observer unsubscribes from the resulting Observable.

doOnCompleted

The doOnCompleted operator registers an Action which will be called if the resulting Observable terminates normally, calling onCompleted.

doOnError

The doOnError operator registers an Action which will be called if the resulting Observable terminates abnormally, calling onError. This Action will be passed the Throwable representing the error.

doOnTerminate

The doOnTerminate operator registers an Action which will be called just before the resulting Observable terminates, whether normally or with an error.

finallyDo

finallyDo is deprecated since RxJava 1.1.1, in favor of doAfterTerminate with the same behavior.

The finallyDo operator registers an Action which will be called just after the resulting Observable terminates, whether normally or with an error.

doAfterTerminate

The doAfterTerminate operator registers an Action which will be called just after the resulting Observable terminates, whether normally or with an error.

do

RxJS implements the basic Do operator as do or tap (two names for the same operator). You have two choices for how to use this operator:

  1. You can pass it an Observer, in which case do/tap will call that Observer’s methods as though that Observer had subscribed to the resulting Observable.
  2. You can pass in a set of 1–3 individual functions (onNext, onError, and onCompleted) that do/tap will call along with the similarly-named functions of any of its observers.

Sample Code

/* Using an observer */
var observer = Rx.Observer.create(
  function (x) { console.log('Do Next: %s', x); },
  function (err) { console.log('Do Error: %s', err); },
  function () { console.log('Do Completed'); }
);

var source = Rx.Observable.range(0, 3)
    .do(observer);

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Do Completed
Completed
/* Using a function */
var source = Rx.Observable.range(0, 3)
  .do(
    function (x)   { console.log('Do Next:', x); },
    function (err) { console.log('Do Error:', err); },
    function ()    { console.log('Do Completed'); }
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Do Completed
Completed
doOnNext

RxJS also implements doOnNext or tapOnNext (two names for the same operator). It is a specialized form of Do that responds only to the onNext case, by calling a callback function you provide as a parameter. You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes.

Sample Code

var source = Rx.Observable.range(0, 3)
  .doOnNext(
    function () { this.log('Do Next: %s', x); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Next: 0
Next: 0
Do Next: 1
Next: 1
Do Next: 2
Next: 2
Completed
doOnError

RxJS also implements doOnError or tapOnError (two names for the same operator). It is a specialized form of Do that responds only to the onError case, by calling a callback function you provide as a parameter. You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes.

Sample Code

var source = Rx.Observable.throw(new Error());
  .doOnError(
    function (err) { this.log('Do Error: %s', err); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Do Error: Error
Error: Error
doOnCompleted

RxJS also implements doOnCompleted or tapOnCompleted (two names for the same operator). It is a specialized form of Do that responds only to the onCompleted case, by calling a callback function you provide as a parameter. You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes.

Sample Code

var source = Rx.Observable.range(0, 3)
  .doOnCompleted(
    function () { this.log('Do Completed'); },
    console
  );

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Do Completed
Completed
finally

RxJS also implements a finally operator. It takes a function that will be called after the resulting Observable terminates, whether normally (onCompleted) or abnormally (onError).

Sample Code

var source = Rx.Observable.throw(new Error())
    .finally(function () {
        console.log('Finally');
    });

var subscription = source.subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
Error: Error
Finally

do/tap, doOnNext/tapOnNext, doOnError/tapOnError, doOnCompleted/tapOnCompleted, and finally are found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP implements this operator as do.

Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. When using do, it is important to note that the Observer may receive additional events after a stream has completed or errored (such as when using a repeat or resubscribing). If you are using an Observable that extends the AbstractObservable, you will not receive these events. For this special case, use the DoObserver. doOnNext, doOnError, and doOnCompleted uses the DoObserver internally and will receive these additional events.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/do.php

$source = \Rx\Observable::range(0, 3)
    ->do(
        function ($x) {
            echo 'Do Next:', $x, PHP_EOL;
        },
        function (Throwable $err) {
            echo 'Do Error:', $err->getMessage(), PHP_EOL;
        },
        function () {
            echo 'Do Completed', PHP_EOL;
        }
    );

$subscription = $source->subscribe($stdoutObserver);
   
Do Next:0
Next value: 0
Do Next:1
Next value: 1
Do Next:2
Next value: 2
Do Completed
Complete!
    

RxPHP also has an operator doOnError.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnError.php

$source = \Rx\Observable::error(new Exception('Oops'))
    ->doOnError(function (Throwable $err) {
        echo 'Do Error:', $err->getMessage(), PHP_EOL;
    });

$subscription = $source->subscribe($stdoutObserver);
   
Do Error:Oops
Exception: Oops
    

RxPHP also has an operator doOnCompleted.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnCompleted.php

$source = \Rx\Observable::empty()
    ->doOnCompleted(function () {
        echo 'Do Completed', PHP_EOL;
    });

$subscription = $source->subscribe($stdoutObserver);
   
Do Completed
Complete!
    

RxPHP also has an operator finally.

Will call a specified function when the source terminates on complete or error.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/finally/finally.php

Rx\Observable::range(1, 3)
    ->finally(function() {
        echo "Finally\n";
    })
    ->subscribe($stdoutObserver);

   
Next value: 1
Next value: 2
Next value: 3
Complete!
Finally
    
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/finally/finally-error.php

Rx\Observable::range(1, 3)
    ->map(function($value) {
        if ($value == 2) {
            throw new \Exception('error');
        }
        return $value;
    })
    ->finally(function() {
        echo "Finally\n";
    })
    ->subscribe($stdoutObserver);

   
Next value: 1
Exception: error
Finally