스케줄러

Observable 연산자 체인에 멀티스레딩을 적용하고 싶다면, 특정 스케줄러를 사용해서 연산자(또는 특정 Observable)를 실행하면 된다.

ReactiveX의 일부 Observable 연산자는 사용할 스케줄러를 파라미터로 전달 받기도 하는데, 이 연산자들은 자신이 처리할 연산의 일부 또는 전체를 전달된 스케줄러 내에서 실행한다.

기본적으로, Observable과 연산자 체인은 이처럼 스케줄러를 통해 동작하고 Subscribe 메서드가 호출되는 스레드를 사용해서 옵저버에게 알림을 보낸다. SubscribeOn 연산자는 다른 스케줄러를 지정해서 Observable이 처리해야 할 연산자들을 실행 시킨다. 그리고, ObserveOn 연산자는 Observable이 옵저버에게 알림을 보낼때 사용 할 스케줄러를 명시한다.

아래 그림이 보여주듯, SubscribeOn 연산자는 Observable이 연산을 위해 사용할 스레드를 지정하며, 연산자 체인 중 아무 곳에서 호출해도 문제되지 않는다. 하지만, ObserveOn 연산자는 연산자 체인 중 Observable이 사용할 스레드가 호출 체인 중 어느 시점에서 할당되는지에 따라 그 후에 호출되는 연산자는 영향을 받는다. 그렇기 때문에, 어쩌면 여러분은 특정 연산자를 별도의 스레드에서 실행 시키기 위해 연산자 체인 중 한 군데 이상에서ObserveOn을 호출하게 될 것이다.

ObserveOn and SubscribeOn

참고

언어별 명세:

다양한 스케줄러

Schedulers 클래스의 팩토리 메서드를 사용해서 스케쥴러를 생성한다. 아래의 테이블은 RxGroovy에서 제공하는 메서드와 사용 가능한 스케줄러들을 보여준다;

스케줄러용도
Schedulers.computation( ) 이벤트-루프와 콜백 처리 같은 연산 중심적인 작업을 위해 사용된다; 그렇기 때문에 I/O를 위한 용도로는 사용하지 않아야 한다(대신 Schedulers.io( )를 사용); 기본적으로 스레드의 수는 프로세서의 수와 같다
Schedulers.from(executor) 명시한 Executor를 스케줄러로 사용한다
Schedulers.immediate( ) 현재 스레드에서 즉시 실행할 작업을 스케줄링 한다
Schedulers.io( ) 블러킹 I/O의 비동기 연산 같은 I/O 바운드 작업을 처리한다. 이 스케줄러는 필요한 만큼 증가하는 스레드-풀을 통해 실행된다; 일반적인 연산이 필요한 작업은 Schedulers.computation( )를 사용하면 된다; 기본적으로 Schedulers.io( )는 CachedThreadScheduler이며, CachedThreadScheduler는 스레드 캐싱을 사용하는 새로운 스레드 스케줄러라고 생각하면 된다
Schedulers.newThread( ) 각각의 단위 작업을 위한 새로운 스레드를 생성한다
Schedulers.trampoline( ) 대기 중인 큐를 처리한 후에 현재 스레드에서 실행 할 작업 큐를 만든다

RxGroovy Observable 연산자를 위한 기본 스케줄러

RxGroovy의 일부 Observable 연산자들은 자신이 처리할 연산(또는 최소한 연산의 일부)을 위해 사용할 스케줄러를 지정할 수 있는 기능을 제공한다. 그 외에는 특정 스케줄러 상에서 동작할 수 없거나 또는 특정 기본 스케줄러 상에서만 동작한다. 여기서 설명하는 기본 스케줄러는 아래와 같다:

연산자스케줄러
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalimmediate
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampimmediate
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

테스트 스케줄러

TestScheduler는 스케줄러의 시간에 따른 동작을 정교하게 제어할 수 있는 방법들을 제공하며, 원하는 시점에서 정확하게 동작해야 하는 상황을 테스트 할 때 유용하게 사용된다. 이 스케줄러는 세 개의 메서드를 제공한다:

advanceTimeTo(time,unit)
특정 시간대로 스케줄러의 시간을 앞당긴다
advanceTimeBy(time,unit)
특정 시간만큼 스케줄러의 시간을 앞당긴다
triggerActions( )
스케줄러가 가리키는 시간보다 이전에 혹은 현재까지 예약된, 아직 실행되지 않은 동작들을 시작한다

참고

다양한 스케줄러

Schedulers 클래스의 팩토리 메서드를 사용해서 스케쥴러를 생성한다. 아래의 테이블은 RxJava에서 제공하는 메서드와 사용 가능한 스케줄러들을 보여준다:

스케줄러용도
Schedulers.computation( ) 이벤트-루프와 콜백 처리 같은 연산 중심적인 작업을 위해 사용된다; 그렇기 때문에 I/O를 위한 용도로는 사용하지 말아야 한다(대신 Schedulers.io( )를 사용); 기본적으로 스레드의 수는 프로세서의 수와 같다
Schedulers.from(executor) 명시한 Executor를 스케줄러로 사용한다
Schedulers.immediate( ) 현재 스레드에서 즉시 실행할 작업을 스케줄링 한다
Schedulers.io( ) 블러킹 I/O의 비동기 연산 같은 I/O 바운드 작업을 처리한다. 이 스케줄러는 필요한 만큼 증가하는 스레드-풀을 통해 실행된다; 일반적인 연산이 필요한 작업은 Schedulers.computation( )를 사용하면 된다; 기본적으로 Schedulers.io( )이며 CachedThreadScheduler로, CachedThreadScheduler는 스레드 캐싱을 사용하는 새로운 스레드 스케줄러라고 생각하면 된다
Schedulers.newThread( ) 각각의 단위 작업을 위한 새로운 스레드를 생성한다
Schedulers.trampoline( ) 대기 중인 큐를 처리한 후에 현재 스레드에서 실행 될 작업 큐를 만든다

RxJava Observable 연산자를 위한 기본 스케줄러

RxJava의 일부 Observable 연산자들은 자신이 처리할 연산(또는 최소한 연산의 일부)을 위해 사용할 스케줄러를 지정하는 기능을 제공한다. 그 외에는 특정 스케줄러 상에서 동작할 수 없거나 또는 특정 기본 스케줄러 상에서만 동작한다. 여기서 설명하는 기본 스케줄러는 아래와 같다:

연산자스케줄러
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalimmediate
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampimmediate
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

스케줄러 사용

RxJava의 Observable 연산자에 스케줄러를 전달하는 점만 다를 뿐, 구독 시 여러분이 작성한 코드를 스케줄링 하기 위해서 이 스케줄러들을 그대로 사용할 수 있다. 아래의 예제 코드는 newThread 스케줄러 안에서 작업을 스케줄링 하기 위해 Scheduler.Worker 클래스schedule 메서드를 사용한다:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// 그 후에...
worker.unsubscribe();

재귀 스케줄러

재귀 호출 스케줄링은 schedule로 스케줄링 한 후에, 작업자 객체 안에서 schedule(this)를 호출한다:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// 그 후에...
worker.unsubscribe();

구독 해지 상태 체크 또는 설정

Worker 클래스의 객체들은 Subscription 인터페이스의 isUnsubscribedunsubscribe 메서드를 구현하기 때문에, 구독이 취소 됐을 때 작업을 중지 할 수 있을 뿐만 아니라, 예약된 작업 내에서도 구독을 취소 할 수 있다:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

Worker 역시 하나의 Subscription이기 때문에(결과적으로 반드시) 작업을 중지시키고 리소스를 반환할 수 있도록 unsubscribe 메서드를 호출해야 한다:

worker.unsubscribe();

지연 그리고 주기적 스케줄러

일정 시간이 경과할 때까지 주어진 스케줄러 상에서 지정한 동작을 지연시킬 수 있도록 다른 형태의 schedule 메서드 사용도 가능하다. 아래 예제 코드는 스케줄러의 시간이 500ms 경과한 후에 someScheduler에서 someAction을 실행시키는 스케줄링 작업을 정의한다:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

또 다른 Scheduler 메서드는 특정 시간마다 지정된 동작을 실행하는 스케줄을 정의한다. 아래의 예제는 500ms가 경과한 후, 매 250ms 마다 someScheduler에서 someAction를 실행한다:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

테스트 스케줄러

TestScheduler는 스케줄러의 시간에 따른 동작을 정교하게 제어할 수 있는 방법들을 제공하는데, 원하는 시점에 정확하게 동작해야 하는 상황을 테스트 할 때 유용하게 사용된다. 이 스케줄러는 세 개의 메서드를 제공한다:

advanceTimeTo(time,unit)
특정 시간대로 스케줄러의 시간을 앞당긴다
advanceTimeBy(time,unit)
특정 시간만큼 스케줄러의 시간을 앞으로 당긴다
triggerActions( )
스케줄러가 가리키는 시간보다 이전에 혹은 현재까지 예약된, 아직 실행되지 않은 동작들을 시작한다

참고

RxJS에서는 Rx.Scheduler 또는 독립적으로 구현한 객체를 통해 스케줄러를 생성한다. 아래의 테이블은 RxJS에서 사용 가능한 스케줄러들을 보여준다:

스케줄러용도
Rx.Scheduler.currentThread 현재 스레드에서 가능한 빠르게 작업을 스케줄링 한다
Rx.HistoricalScheduler 과거의 특정 시점을 지정해서 마치 지정된 과거에서부터 실행되고 있었던 것처럼 작업을 스케줄링 한다
Rx.Scheduler.immediate 현재 스레드에서 작업을 즉시 스케줄링 한다
Rx.TestScheduler 단위 테스트를 위해 사용되며, 시간의 흐름을 조절할 수 있다
Rx.Scheduler.timeout 지정된 콜백을 사용해서 작업을 스케줄링 한다

참고

TBD

TBD