Schedulers.io( ) |
블러킹 I/O의 비동기 연산 같은 I/O 바운드 작업을 처리한다. 이 스케줄러는 필요한 만큼 증가하는 스레드-풀을 통해 실행된다; 일반적인 연산이 필요한 작업은 Schedulers.computation( ) 를 사용하면 된다; 기본적으로 Schedulers.io( )는 CachedThreadScheduler 이며, CachedThreadScheduler 는 스레드 캐싱을 사용하는 새로운 스레드 스케줄러라고 생각하면 된다</td></tr>
Schedulers.newThread( ) |
각각의 단위 작업을 위한 새로운 스레드를 생성한다 |
Schedulers.trampoline( ) |
대기 중인 큐를 처리한 후에 현재 스레드에서 실행 할 작업 큐를 만든다 |
</tbody>
</table>
RxGroovy Observable 연산자를 위한 기본 스케줄러
RxGroovy의 일부 Observable 연산자들은 자신이 처리할 연산(또는 최소한 연산의 일부)을 위해 사용할 스케줄러를 지정할 수 있는 기능을 제공한다. 그 외에는 특정 스케줄러 상에서 동작할 수 없거나 또는 특정 기본 스케줄러 상에서만 동작한다. 여기서 설명하는 기본 스케줄러는 아래와 같다:
연산자 | 스케줄러 |
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retry | trampoline |
sample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
테스트 스케줄러
이
TestScheduler 는 스케줄러의 시간에 따른 동작을 정교하게 제어할 수 있는 방법들을 제공하며, 원하는 시점에서 정확하게 동작해야 하는 상황을 테스트 할 때 유용하게 사용된다. 이 스케줄러는 세 개의 메서드를 제공한다:
advanceTimeTo(time,unit)
- 특정 시간대로 스케줄러의 시간을 앞당긴다
advanceTimeBy(time,unit)
- 특정 시간만큼 스케줄러의 시간을 앞당긴다
triggerActions( )
- 스케줄러가 가리키는 시간보다 이전에 혹은 현재까지 예약된, 아직 실행되지 않은 동작들을 시작한다
참고
</div>
</div>
</div>
다양한 스케줄러
Schedulers 클래스의 팩토리 메서드를 사용해서 스케쥴러를 생성한다. 아래의 테이블은 RxJava에서 제공하는 메서드와 사용 가능한 스케줄러들을 보여준다:
스케줄러 | 용도 |
Schedulers.computation( ) |
이벤트-루프와 콜백 처리 같은 연산 중심적인 작업을 위해 사용된다; 그렇기 때문에 I/O를 위한 용도로는 사용하지 말아야 한다(대신 Schedulers.io( ) 를 사용); 기본적으로 스레드의 수는 프로세서의 수와 같다 |
Schedulers.from(executor) |
명시한 Executor 를 스케줄러로 사용한다 |
Schedulers.immediate( ) |
현재 스레드에서 즉시 실행할 작업을 스케줄링 한다 |
Schedulers.io( ) |
블러킹 I/O의 비동기 연산 같은 I/O 바운드 작업을 처리한다. 이 스케줄러는 필요한 만큼 증가하는 스레드-풀을 통해 실행된다; 일반적인 연산이 필요한 작업은 Schedulers.computation( ) 를 사용하면 된다; 기본적으로 Schedulers.io( )이며 CachedThreadScheduler 로, CachedThreadScheduler 는 스레드 캐싱을 사용하는 새로운 스레드 스케줄러라고 생각하면 된다</td></tr>
Schedulers.newThread( ) |
각각의 단위 작업을 위한 새로운 스레드를 생성한다 |
Schedulers.trampoline( ) |
대기 중인 큐를 처리한 후에 현재 스레드에서 실행 될 작업 큐를 만든다 |
</tbody>
</table>
RxJava Observable 연산자를 위한 기본 스케줄러
RxJava의 일부 Observable 연산자들은 자신이 처리할 연산(또는 최소한 연산의 일부)을 위해 사용할 스케줄러를 지정하는 기능을 제공한다. 그 외에는 특정 스케줄러 상에서 동작할 수 없거나 또는 특정 기본 스케줄러 상에서만 동작한다. 여기서 설명하는 기본 스케줄러는 아래와 같다:
연산자 | 스케줄러 |
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retry | trampoline |
sample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
스케줄러 사용
RxJava의 Observable 연산자에 스케줄러를 전달하는 점만 다를 뿐, 구독 시 여러분이 작성한 코드를 스케줄링 하기 위해서 이 스케줄러들을 그대로 사용할 수 있다. 아래의 예제 코드는 newThread 스케줄러 안에서 작업을 스케줄링 하기 위해 Scheduler.Worker 클래스의 schedule 메서드를 사용한다:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// 그 후에...
worker.unsubscribe();
재귀 스케줄러
재귀 호출 스케줄링은 schedule 로 스케줄링 한 후에, 작업자 객체 안에서 schedule(this) 를 호출한다:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}
});
// 그 후에...
worker.unsubscribe();
구독 해지 상태 체크 또는 설정
Worker 클래스의 객체들은 Subscription 인터페이스의 isUnsubscribed 와 unsubscribe
메서드를 구현하기 때문에, 구독이 취소 됐을 때 작업을 중지 할 수 있을 뿐만 아니라, 예약된 작업 내에서도 구독을 취소 할 수 있다:
Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}
});
Worker 역시 하나의 Subscription 이기 때문에(결과적으로 반드시) 작업을 중지시키고 리소스를 반환할 수 있도록 unsubscribe 메서드를 호출해야 한다:
지연 그리고 주기적 스케줄러
일정 시간이 경과할 때까지 주어진 스케줄러 상에서 지정한 동작을 지연시킬 수 있도록 다른 형태의 schedule 메서드 사용도 가능하다. 아래 예제 코드는 스케줄러의 시간이 500ms 경과한 후에 someScheduler 에서 someAction 을 실행시키는 스케줄링 작업을 정의한다:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
또 다른 Scheduler 메서드는 특정 시간마다 지정된 동작을 실행하는 스케줄을 정의한다. 아래의 예제는 500ms가 경과한 후, 매 250ms 마다 someScheduler 에서 someAction 를 실행한다:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
테스트 스케줄러
이
TestScheduler 는 스케줄러의 시간에 따른 동작을 정교하게 제어할 수 있는 방법들을 제공하는데, 원하는 시점에 정확하게 동작해야 하는 상황을 테스트 할 때 유용하게 사용된다. 이 스케줄러는 세 개의 메서드를 제공한다:
advanceTimeTo(time,unit)
- 특정 시간대로 스케줄러의 시간을 앞당긴다
advanceTimeBy(time,unit)
- 특정 시간만큼 스케줄러의 시간을 앞으로 당긴다
triggerActions( )
- 스케줄러가 가리키는 시간보다 이전에 혹은 현재까지 예약된, 아직 실행되지 않은 동작들을 시작한다
참고
</div>
</div>
</div>
RxJS에서는 Rx.Scheduler 또는 독립적으로 구현한 객체를 통해 스케줄러를 생성한다. 아래의 테이블은 RxJS에서 사용 가능한 스케줄러들을 보여준다:
스케줄러 | 용도 |
Rx.Scheduler.currentThread |
현재 스레드에서 가능한 빠르게 작업을 스케줄링 한다 |
Rx.HistoricalScheduler |
과거의 특정 시점을 지정해서 마치 지정된 과거에서부터 실행되고 있었던 것처럼 작업을 스케줄링 한다 |
Rx.Scheduler.immediate |
현재 스레드에서 작업을 즉시 스케줄링 한다 |
Rx.TestScheduler |
단위 테스트를 위해 사용되며, 시간의 흐름을 조절할 수 있다 |
Rx.Scheduler.timeout |
지정된 콜백을 사용해서 작업을 스케줄링 한다 |
참고
</div>
| |