#Implementing Your Own Operators
You can implement your own Observable operators. This page shows you how.
If your operator is designed to originate an Observable, rather than to transform or react to a source Observable, use the create( )
method rather than trying to implement Observable
manually. Otherwise, follow the instructions below.
The following example shows how you can chain a custom operator (in this example: myOperator
) along with standard RxJava operators by using the lift( )
operator:
Observable foo = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});
The following section will show how to form the scaffolding of your operator so that it will work correctly with lift( )
.
Define your operator as a public class that implements the Operator
interface, like so:
public class myOperator<T> implements Operator<T> {
public myOperator( /* any necessary params here */ ) {
/* any necessary initialization here */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* add your own onCompleted behavior here, or just pass the completed notification through: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* add your own onError behavior here, or just pass the error notification through: */
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* this example performs some sort of simple transformation on each incoming item and then passes it along */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
isUnsubscribed( )
status before it emits any item to (or sends any notification to) the Subscriber. Do not waste time generating items that no Subscriber is interested in seeing.onNext( )
method any number of times, but these calls must be non-overlapping.onCompleted( )
or onError( )
method, but not both, exactly once, and it may not subsequently call a Subscriber’s onNext( )
method.serialize( )
operator to it to force the correct behavior.first( )
is defined as take(1)
.
single( )
ignoreElements( )
is defined as filter(alwaysFalse( ))
reduce(a)
is defined as scan(a)
.
last( )
onError( )
calls.