필요하다면, 원하는 Observable의 연산자를 직접 구현할 수 있다. 이 페이지를 통해 어떻게 연산자를 구현하는지 설명한다.
만약, 구현하려는 연산자가 소스 Observable이 배출하는 항목에 반응하거나 항목을 다른 형태로 변환하는 것이 아니라, Observable을 생성하는 거라면 Observable
을 직접 구현하려고 애쓰지 말고 대신,
create( )
메서를 사용하는 것이 좋다. 반대로, 소스 Observable이 배출하는 항목에 반응하는 연산자라면 아래 내용을 살펴보자.
다음 예제는 표준 RxJava 연산자인 lift( )
와 사용자 정의 연산자(이 예제에서는 myOperator
)를 함께 체인에 적용하는 방법을 보여준다:
Observable foo = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});
아래는 여러분이 정의한 연산자가 lift( )
와 함께 잘 동작하기 위해서 어떤 구조로 코드를 구현해야 하는지 설명한다.
먼저, 구현하려는 연산자를 Operator
인터페이스를 구현하는 public 클래스로 선언한다. 그럼 아래와 같을 것이다:
public class myOperator<T> implements Operator<T> {
public myOperator( /* 필요한 파라미터 선언 */ ) {
/* 필요한 초기화 코드 구현 */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* onCompleted 시에 필요한 코드를 구현하거나 완료 알림을 그냥 지나친다: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* onError 시에 필요한 코드를 구현하거나 오류 알림을 그냥 지나친다: */
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* 이 예제는 전달된 항목을 간단히 변환한 후 전달하는 코드를 구현한다 */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
구현할 연산자는 구독자에게 항목을 배출하기 전에(혹은 알림을 보내기 전에) 반드시 구독자의 isUnsubscribed( )
상태를 체크해야 한다. 구독자가 없는데도 항목을 배출하기 위해 시간을 낭비할 필요가 없다.
onNext( )
메서드를 수도 없이 많이 호출할 수 있다. 하지만, 절대 중복된 호출이 발생하면 안된다.onCompleted( )
나 onError( )
중 하나를 호출한다. 하지만 둘 모두를 호출해서는 안되며 반드시 둘 중 하나만 한번 호출해야 한다. 그 후에는 구독자의 onNext( )
를 호출하지 않을 것이다.serialize( )
연산자 호출를 추가해서 올바르게 행동하도록 강제할 수 있다.first( )
연산자는 take(1)
.
single( )
를 활용해 만들어 졌다.ignoreElements( )
연산자는 filter(alwaysFalse( ))
를 사용해 만들어 졌다reduce(a)
연산자는 is scan(a)
.
last( )
를 사용해 만들어 졌다onError( )
메서드를 호출해 구독자에게 오류가 발생했음을 알려야 한다.