필요하다면, 원하는 Observable의 연산자를 직접 구현할 수 있다. 이 페이지를 통해 어떻게 연산자를 구현하는지 설명한다.
만약, 구현하려는 연산자가 소스 Observable이 배출하는 항목에 반응하거나 항목을 다른 형태로 변환하는 것이 아니라, Observable을 생성하는 거라면 Observable을 직접 구현하려고 애쓰지 말고 대신,
create( ) 메서를 사용하는 것이 좋다. 반대로, 소스 Observable이 배출하는 항목에 반응하는 연산자라면 아래 내용을 살펴보자.
다음 예제는 표준 RxJava 연산자인 lift( )와 사용자 정의 연산자(이 예제에서는 myOperator)를 함께 체인에 적용하는 방법을 보여준다:
Observable foo = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});
아래는 여러분이 정의한 연산자가 lift( )와 함께 잘 동작하기 위해서 어떤 구조로 코드를 구현해야 하는지 설명한다.
먼저, 구현하려는 연산자를 Operator 인터페이스를 구현하는 public 클래스로 선언한다. 그럼 아래와 같을 것이다:
public class myOperator<T> implements Operator<T> {
public myOperator( /* 필요한 파라미터 선언 */ ) {
/* 필요한 초기화 코드 구현 */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* onCompleted 시에 필요한 코드를 구현하거나 완료 알림을 그냥 지나친다: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* onError 시에 필요한 코드를 구현하거나 오류 알림을 그냥 지나친다: */
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* 이 예제는 전달된 항목을 간단히 변환한 후 전달하는 코드를 구현한다 */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
구현할 연산자는 구독자에게 항목을 배출하기 전에(혹은 알림을 보내기 전에) 반드시 구독자의 isUnsubscribed( ) 상태를 체크해야 한다. 구독자가 없는데도 항목을 배출하기 위해 시간을 낭비할 필요가 없다.
onNext( ) 메서드를 수도 없이 많이 호출할 수 있다. 하지만, 절대 중복된 호출이 발생하면 안된다.onCompleted( )나 onError( ) 중 하나를 호출한다. 하지만 둘 모두를 호출해서는 안되며 반드시 둘 중 하나만 한번 호출해야 한다. 그 후에는 구독자의 onNext( )를 호출하지 않을 것이다.serialize( ) 연산자 호출를 추가해서 올바르게 행동하도록 강제할 수 있다.first( ) 연산자는 take(1).single( )를 활용해 만들어 졌다.ignoreElements( ) 연산자는 filter(alwaysFalse( ))를 사용해 만들어 졌다reduce(a) 연산자는 is scan(a).last( )를 사용해 만들어 졌다onError( ) 메서드를 호출해 구독자에게 오류가 발생했음을 알려야 한다.