You can register callbacks that ReactiveX will call when certain events take place on an Observable, where those callbacks will be called independently from the normal set of notifications associated with an Observable cascade. There are a variety of operators that various ReactiveX implementations have designed to allow for this.
TBD
RxGroovy has several Do variants.
The doOnEach operator allows you to establish a callback that the resulting Observable will
call each time it emits an item. You can pass this callback either in the form of an Action
that takes an onNext variety of Notification as its sole parameter, or you can
pass in an Observer whose onNext method will be called as if it had subscribed to the
Observable.
doOnEach(Action1)doOnEach(Observer)
The doOnNext operator is much like doOnEach(Action1) except that the
Action that you pass it as a parameter does not accept a Notification but
instead simply accepts the emitted item.
doOnNext(Action1)
The doOnRequest operator (new in RxGroovy 1.1) registers an Action which will be
called whenever an observer requests additional items from the resulting Observable. That
Action receives as its parameter the number of items that the observer is requesting.
doOnRequest(Action1)
The doOnSubscribe operator registers an Action which will be called whenever
an observer subscribes to the resulting Observable.
doOnSubscribe(Action0)
The doOnUnsubscribe operator registers an Action which will be called whenever
an observer unsubscribes from the resulting Observable.
doOnUnsubscribe(Action0)
The doOnCompleted operator registers an Action which will be called if the
resulting Observable terminates normally, calling onCompleted.
doOnCompleted(Action0)
The doOnError operator registers an Action which will be called if the
resulting Observable terminates abnormally, calling onError. This Action will
be passed the Throwable representing the error.
doOnError(Action1)
The doOnTerminate operator registers an Action which will be called just
before the resulting Observable terminates, whether normally or with an error.
doOnTerminate(Action0)
The finallyDo operator registers an Action which will be called just
after the resulting Observable terminates, whether normally or with an error.
def numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.finallyDo({ println('Finally'); }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);1 2 3 4 5 Sequence complete Finally
finallyDo(Action0)RxJava has several Do variants.
The doOnEach operator allows you to establish a callback that the resulting Observable will
call each time it emits an item. You can pass this callback either in the form of an Action
that takes an onNext variety of Notification as its sole parameter, or you can
pass in an Observer whose onNext method will be called as if it had subscribed to the
Observable.
doOnEach(Action1)doOnEach(Observer)
The doOnNext operator is much like doOnEach(Action1) except that the
Action that you pass it as a parameter does not accept a Notification but
instead simply accepts the emitted item.
Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer item) {
if( item > 1 ) {
throw new RuntimeException( "Item exceeds maximum value" );
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});Next: 1 Error: Item exceeds maximum value
doOnNext(Action1)
The doOnRequest operator (new in RxJava 1.1) registers an Action which will be
called whenever an observer requests additional items from the resulting Observable. That
Action receives as its parameter the number of items that the observer is requesting.
doOnRequest(Action1)
The doOnSubscribe operator registers an Action which will be called whenever
an observer subscribes to the resulting Observable.
doOnSubscribe(Action0)
The doOnUnsubscribe operator registers an Action which will be called whenever
an observer unsubscribes from the resulting Observable.
doOnUnsubscribe(Action0)
The doOnCompleted operator registers an Action which will be called if the
resulting Observable terminates normally, calling onCompleted.
doOnCompleted(Action0)
The doOnError operator registers an Action which will be called if the
resulting Observable terminates abnormally, calling onError. This Action will
be passed the Throwable representing the error.
doOnError(Action1)
The doOnTerminate operator registers an Action which will be called just
before the resulting Observable terminates, whether normally or with an error.
doOnTerminate(Action0)
finallyDo is deprecated since RxJava 1.1.1, in favor of
doAfterTerminate with the same behavior.
The finallyDo operator registers an Action which will be called just
after the resulting Observable terminates, whether normally or with an error.
finallyDo(Action0)
The doAfterTerminate operator registers an Action which will be called just
after the resulting Observable terminates, whether normally or with an error.
doAfterTerminate(Action0)
RxJS implements the basic Do operator as do or tap
(two names for the same operator). You have two choices for how to use this operator:
do/tap will call that
Observer’s methods as though that Observer had subscribed to the resulting Observable.onNext, onError,
and onCompleted) that do/tap will call along with the
similarly-named functions of any of its observers.
/* Using an observer */
var observer = Rx.Observer.create(
function (x) { console.log('Do Next: %s', x); },
function (err) { console.log('Do Error: %s', err); },
function () { console.log('Do Completed'); }
);
var source = Rx.Observable.range(0, 3)
.do(observer);
var subscription = source.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); },
function () { console.log('Completed'); });Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Do Completed Completed
/* Using a function */
var source = Rx.Observable.range(0, 3)
.do(
function (x) { console.log('Do Next:', x); },
function (err) { console.log('Do Error:', err); },
function () { console.log('Do Completed'); }
);
var subscription = source.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); },
function () { console.log('Completed'); });Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Do Completed Completed
RxJS also implements doOnNext or tapOnNext (two names for the same operator).
It is a specialized form of Do that responds only to the
onNext case, by calling a callback function you provide as a parameter. You may also
optionally pass a second parameter that will be the “this” object from the
point of view of your callback function when it executes.
var source = Rx.Observable.range(0, 3)
.doOnNext(
function () { this.log('Do Next: %s', x); },
console
);
var subscription = source.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); },
function () { console.log('Completed'); });Do Next: 0 Next: 0 Do Next: 1 Next: 1 Do Next: 2 Next: 2 Completed
RxJS also implements doOnError or tapOnError (two names for the same operator).
It is a specialized form of Do that responds only to the
onError case, by calling a callback function you provide as a parameter. You may also
optionally pass a second parameter that will be the “this” object from the
point of view of your callback function when it executes.
var source = Rx.Observable.throw(new Error());
.doOnError(
function (err) { this.log('Do Error: %s', err); },
console
);
var subscription = source.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); },
function () { console.log('Completed'); });Do Error: Error Error: Error
RxJS also implements doOnCompleted or tapOnCompleted (two names for the same
operator). It is a specialized form of Do that responds only to the
onCompleted case, by calling a callback function you provide as a parameter. You may also
optionally pass a second parameter that will be the “this” object from the
point of view of your callback function when it executes.
var source = Rx.Observable.range(0, 3)
.doOnCompleted(
function () { this.log('Do Completed'); },
console
);
var subscription = source.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); },
function () { console.log('Completed'); });Next: 0 Next: 1 Next: 2 Do Completed Completed
RxJS also implements a finally operator. It takes a function that will be called after the
resulting Observable terminates, whether normally (onCompleted) or abnormally
(onError).
var source = Rx.Observable.throw(new Error())
.finally(function () {
console.log('Finally');
});
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Error: Error Finally
do/tap, doOnNext/tapOnNext,
doOnError/tapOnError, doOnCompleted/tapOnCompleted, and
finally are found in each of the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.jsTBD
RxPHP implements this operator as doOnEach.
Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnEach.php
$source = \Rx\Observable::range(0, 3)
->doOnEach(new \Rx\Observer\CallbackObserver(
function ($x) {
echo 'Do Next:', $x, PHP_EOL;
},
function (Exception $err) {
echo 'Do Error:', $err->getMessage(), PHP_EOL;
},
function () {
echo 'Do Completed', PHP_EOL;
}
));
$subscription = $source->subscribe($stdoutObserver);
Do Next:0
Next value: 0
Do Next:1
Next value: 1
Do Next:2
Next value: 2
Do Completed
Complete!
RxPHP also has an operator doOnNext.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnNext.php
$source = \Rx\Observable::range(0, 3)
->doOnNext(function ($x) {
echo 'Do Next:', $x, PHP_EOL;
});
$subscription = $source->subscribe($stdoutObserver);
Do Next:0
Next value: 0
Do Next:1
Next value: 1
Do Next:2
Next value: 2
Complete!
RxPHP also has an operator doOnError.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnError.php
$source = \Rx\Observable::error(new Exception('Oops'))
->doOnError(function (Exception $err) {
echo 'Do Error:', $err->getMessage(), PHP_EOL;
});
$subscription = $source->subscribe($stdoutObserver);
Do Error:Oops
Exception: Oops
RxPHP also has an operator doOnCompleted.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/do/doOnCompleted.php
$source = \Rx\Observable::emptyObservable()
->doOnCompleted(function () {
echo 'Do Completed', PHP_EOL;
});
$subscription = $source->subscribe($stdoutObserver);
Do Completed
Complete!
TBD
TBD