You can register callbacks that ReactiveX will call when certain events take place on an Observable, where those callbacks will be called independently from the normal set of notifications associated with an Observable cascade. There are a variety of operators that various ReactiveX implementations have designed to allow for this.
do finally
TBD
finally
doOnCompleted doOnEach doOnError doOnNext doOnRequest doOnSubscribe doOnTerminate doOnUnsubscribe finallyDo
RxGroovy has several Do variants.
The doOnEach operator allows you to establish a callback that the resulting Observable will call each time it emits an item. You can pass this callback either in the form of an Action that takes an onNext variety of Notification as its sole parameter, or you can pass in an Observer whose onNext method will be called as if it had subscribed to the Observable.
doOnEach
Action
onNext
Notification
doOnEach(Action1)
doOnEach(Observer)
The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.
doOnNext
doOnNext(Action1)
The doOnRequest operator (new in RxGroovy 1.1) registers an Action which will be called whenever an observer requests additional items from the resulting Observable. That Action receives as its parameter the number of items that the observer is requesting.
doOnRequest
doOnRequest(Action1)
The doOnSubscribe operator registers an Action which will be called whenever an observer subscribes to the resulting Observable.
doOnSubscribe
doOnSubscribe(Action0)
The doOnUnsubscribe operator registers an Action which will be called whenever an observer unsubscribes from the resulting Observable.
doOnUnsubscribe
doOnUnsubscribe(Action0)
The doOnCompleted operator registers an Action which will be called if the resulting Observable terminates normally, calling onCompleted.
doOnCompleted
onCompleted
doOnCompleted(Action0)
The doOnError operator registers an Action which will be called if the resulting Observable terminates abnormally, calling onError. This Action will be passed the Throwable representing the error.
doOnError
onError
Throwable
doOnError(Action1)
The doOnTerminate operator registers an Action which will be called just before the resulting Observable terminates, whether normally or with an error.
doOnTerminate
doOnTerminate(Action0)
The finallyDo operator registers an Action which will be called just after the resulting Observable terminates, whether normally or with an error.
finallyDo
def numbers = Observable.from([1, 2, 3, 4, 5]); numbers.finallyDo({ println('Finally'); }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 4 5 Sequence complete Finally
finallyDo(Action0)
doOnCompleted doOnEach doOnError doOnNext doOnRequest doOnSubscribe doOnTerminate doOnUnsubscribe finallyDo doAfterTerminate
RxJava has several Do variants.
Observable.just(1, 2, 3) .doOnNext(new Action1<Integer>() { @Override public void call(Integer item) { if( item > 1 ) { throw new RuntimeException( "Item exceeds maximum value" ); } } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Error: Item exceeds maximum value
The doOnRequest operator (new in RxJava 1.1) registers an Action which will be called whenever an observer requests additional items from the resulting Observable. That Action receives as its parameter the number of items that the observer is requesting.
finallyDo is deprecated since RxJava 1.1.1, in favor of doAfterTerminate with the same behavior.
doAfterTerminate
The doAfterTerminate operator registers an Action which will be called just after the resulting Observable terminates, whether normally or with an error.
doAfterTerminate(Action0)
doAfterTerminate doOnComplete doOnDispose doOnEach doOnError doOnLifecycle doOnNext doOnSubscribe doOnTerminate onTerminateDetach
do doOnCompleted doOnError doOnNext finally tap tapOnCompleted tapOnError tapOnNext
RxJS implements the basic Do operator as do or tap (two names for the same operator). You have two choices for how to use this operator:
do
tap
/* Using an observer */ var observer = Rx.Observer.create( function (x) { console.log('Do Next: %s', x); }, function (err) { console.log('Do Error: %s', err); }, function () { console.log('Do Completed'); } ); var source = Rx.Observable.range(0, 3) .do(observer); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Do Completed Completed
/* Using a function */ var source = Rx.Observable.range(0, 3) .do( function (x) { console.log('Do Next:', x); }, function (err) { console.log('Do Error:', err); }, function () { console.log('Do Completed'); } ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
RxJS also implements doOnNext or tapOnNext (two names for the same operator). It is a specialized form of Do that responds only to the onNext case, by calling a callback function you provide as a parameter. You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes.
tapOnNext
this
var source = Rx.Observable.range(0, 3) .doOnNext( function () { this.log('Do Next: %s', x); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Completed
RxJS also implements doOnError or tapOnError (two names for the same operator). It is a specialized form of Do that responds only to the onError case, by calling a callback function you provide as a parameter. You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes.
tapOnError
var source = Rx.Observable.throw(new Error()); .doOnError( function (err) { this.log('Do Error: %s', err); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Do Error: Error Error: Error
RxJS also implements doOnCompleted or tapOnCompleted (two names for the same operator). It is a specialized form of Do that responds only to the onCompleted case, by calling a callback function you provide as a parameter. You may also optionally pass a second parameter that will be the “this” object from the point of view of your callback function when it executes.
tapOnCompleted
var source = Rx.Observable.range(0, 3) .doOnCompleted( function () { this.log('Do Completed'); }, console ); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Do Completed Completed
RxJS also implements a finally operator. It takes a function that will be called after the resulting Observable terminates, whether normally (onCompleted) or abnormally (onError).
var source = Rx.Observable.throw(new Error()) .finally(function () { console.log('Finally'); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Error: Error Finally
do/tap, doOnNext/tapOnNext, doOnError/tapOnError, doOnCompleted/tapOnCompleted, and finally are found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
doOnCompleted doOnEach doOnError doOnNext doOnSubscribe doOnTerminate doOnUnsubscribe finallyDo
Do Finally
do doOnError doOnCompleted finally
RxPHP implements this operator as do.
Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. When using do, it is important to note that the Observer may receive additional events after a stream has completed or errored (such as when using a repeat or resubscribing). If you are using an Observable that extends the AbstractObservable, you will not receive these events. For this special case, use the DoObserver. doOnNext, doOnError, and doOnCompleted uses the DoObserver internally and will receive these additional events.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/do.php $source = \Rx\Observable::range(0, 3) ->do( function ($x) { echo 'Do Next:', $x, PHP_EOL; }, function (Throwable $err) { echo 'Do Error:', $err->getMessage(), PHP_EOL; }, function () { echo 'Do Completed', PHP_EOL; } ); $subscription = $source->subscribe($stdoutObserver);
Do Next:0 Next value: 0 Do Next:1 Next value: 1 Do Next:2 Next value: 2 Do Completed Complete!
RxPHP also has an operator doOnError.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnError.php $source = \Rx\Observable::error(new Exception('Oops')) ->doOnError(function (Throwable $err) { echo 'Do Error:', $err->getMessage(), PHP_EOL; }); $subscription = $source->subscribe($stdoutObserver);
Do Error:Oops Exception: Oops
RxPHP also has an operator doOnCompleted.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnCompleted.php $source = \Rx\Observable::empty() ->doOnCompleted(function () { echo 'Do Completed', PHP_EOL; }); $subscription = $source->subscribe($stdoutObserver);
Do Completed Complete!
RxPHP also has an operator finally.
Will call a specified function when the source terminates on complete or error.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/finally/finally.php Rx\Observable::range(1, 3) ->finally(function() { echo "Finally\n"; }) ->subscribe($stdoutObserver);
Next value: 1 Next value: 2 Next value: 3 Complete! Finally
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/finally/finally-error.php Rx\Observable::range(1, 3) ->map(function($value) { if ($value == 2) { throw new \Exception('error'); } return $value; }) ->finally(function() { echo "Finally\n"; }) ->subscribe($stdoutObserver);
Next value: 1 Exception: error Finally
do_action finally_action tap
ensures tap
doOn