Scheduler

If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.

Some ReactiveX Observable operators have variants that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

ObserveOn and SubscribeOn

See Also

Language-Specific Information:

Varieties of Scheduler

You obtain a Scheduler from the factory methods described in the Schedulers class. The following table shows the varieties of Scheduler that are available to you by means of these methods in RxGroovy:

Schedulerpurpose
Schedulers.computation( )meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors
Schedulers.from(executor)uses the specified Executor as a Scheduler
Schedulers.immediate( )schedules work to begin immediately in the current thread
Schedulers.io( )meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching
Schedulers.newThread( )creates a new thread for each unit of work
Schedulers.trampoline( )queues work to begin on the current thread after any already-queued work

Default Schedulers for RxGroovy Observable Operators

Some Observable operators in RxGroovy have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. Others do not operate on any particular Scheduler, or operate on a particular default Scheduler. Those that have a particular default Scheduler include:

operatorScheduler
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalimmediate
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampimmediate
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

Test Scheduler

The TestScheduler allows you to exercise fine-tuned manual control over how the Scheduler’s clock behaves. This can be useful for testing interactions that depend on precise arrangements of actions in time. This Scheduler has three additional methods:

advanceTimeTo(time,unit)
advances the Scheduler’s clock to a particular point in time
advanceTimeBy(time,unit)
advances the Scheduler’s clock forward by a particular amount of time
triggerActions( )
start any unstarted actions that have been scheduled for a time equal to or earlier than the present time according to the Scheduler’s clock

See Also

Varieties of Scheduler

You obtain a Scheduler from the factory methods described in the Schedulers class. The following table shows the varieties of Scheduler that are available to you by means of these methods in RxJava:

Schedulerpurpose
Schedulers.computation( )meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors
Schedulers.from(executor)uses the specified Executor as a Scheduler
Schedulers.immediate( )schedules work to begin immediately in the current thread
Schedulers.io( )meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching
Schedulers.newThread( )creates a new thread for each unit of work
Schedulers.trampoline( )queues work to begin on the current thread after any already-queued work

Default Schedulers for RxJava 1.x Observable Operators

Some Observable operators in RxJava have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. Others do not operate on any particular Scheduler, or operate on a particular default Scheduler. Those that have a particular default Scheduler include:

operatorScheduler
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalimmediate
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampimmediate
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

Using Schedulers

Aside from passing these Schedulers in to RxJava Observable operators, you can also use them to schedule your own work on Subscriptions. The following example uses the schedule method of the Scheduler.Worker class to schedule work on the newThread Scheduler:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();

Recursive Schedulers

To schedule recursive calls, you can use schedule and then schedule(this) on the Worker object:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

Checking or Setting Unsubscribed Status

Objects of the Worker class implement the Subscription interface, with its isUnsubscribed and unsubscribe methods, so you can stop work when a subscription is cancelled, or you can cancel the subscription from within the scheduled task:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

The Worker is also a Subscription and so you can (and should, eventually) call its unsubscribe method to signal that it can halt work and release resources:

worker.unsubscribe();

Delayed and Periodic Schedulers

You can also use a version of schedule that delays your action on the given Scheduler until a certain timespan has passed. The following example schedules someAction to be performed on someScheduler after 500ms have passed according to that Scheduler’s clock:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

Another Scheduler method allows you to schedule an action to take place at regular intervals. The following example schedules someAction to be performed on someScheduler after 500ms have passed, and then every 250ms thereafter:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

Test Scheduler

The TestScheduler allows you to exercise fine-tuned manual control over how the Scheduler’s clock behaves. This can be useful for testing interactions that depend on precise arrangements of actions in time. This Scheduler has three additional methods:

advanceTimeTo(time,unit)
advances the Scheduler’s clock to a particular point in time
advanceTimeBy(time,unit)
advances the Scheduler’s clock forward by a particular amount of time
triggerActions( )
start any unstarted actions that have been scheduled for a time equal to or earlier than the present time according to the Scheduler’s clock

See Also

In RxJS you obtain Schedulers from the Rx.Scheduler object or as independently-implemented objects. The following table shows the varieties of Scheduler that are available to you in RxJS:.

Schedulerpurpose
Rx.Scheduler.currentThreadschedules work as soon as possible on the current thread
Rx.HistoricalSchedulerschedules work as though it were occurring at an arbitrary historical time
Rx.Scheduler.immediateschedules work immediately on the current thread
Rx.TestSchedulerfor unit testing; this allows you to manually manipulate the movement of time
Rx.Scheduler.timeoutschedules work by means of a timed callback

See Also

TBD

TBD