public abstract class Scheduler extends Object
Scheduler
is an object that specifies an API for scheduling
units of work provided in the form of Runnable
s to be
executed without delay (effectively as soon as possible), after a specified time delay or periodically
and represents an abstraction over an asynchronous boundary that ensures
these units of work get executed by some underlying task-execution scheme
(such as custom Threads, event loop, Executor
or Actor system)
with some uniform properties and guarantees regardless of the particular underlying
scheme.
You can get various standard, RxJava-specific instances of this class via
the static methods of the Schedulers
utility class.
The so-called Scheduler.Worker
s of a Scheduler
can be created via the createWorker()
method which allow the scheduling
of multiple Runnable
tasks in an isolated manner. Runnable
tasks scheduled on a Worker
are guaranteed to be
executed sequentially and in a non-overlapping fashion. Non-delayed Runnable
tasks are guaranteed to execute in a
First-In-First-Out order but their execution may be interleaved with delayed tasks.
In addition, outstanding or running tasks can be cancelled together via
Disposable.dispose()
without affecting any other Worker
instances of the same Scheduler
.
Implementations of the scheduleDirect(java.lang.Runnable)
and Scheduler.Worker.schedule(java.lang.Runnable)
methods are encouraged to call the RxJavaPlugins.onSchedule(Runnable)
method to allow a scheduler hook to manipulate (wrap or replace) the original Runnable
task before it is submitted to the
underlying task-execution scheme.
The default implementations of the scheduleDirect
methods provided by this abstract class
delegate to the respective schedule
methods in the Scheduler.Worker
instance created via createWorker()
for each individual Runnable
task submitted. Implementors of this class are encouraged to provide
a more efficient direct scheduling implementation to avoid the time and memory overhead of creating such Worker
s
for every task.
This delegation is done via special wrapper instances around the original Runnable
before calling the respective
Worker.schedule
method. Note that this can lead to multiple RxJavaPlugins.onSchedule
calls and potentially
multiple hooks applied. Therefore, the default implementations of scheduleDirect
(and the Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit)
)
wrap the incoming Runnable
into a class that implements the SchedulerRunnableIntrospection
interface which can grant access to the original or hooked Runnable
, thus, a repeated RxJavaPlugins.onSchedule
can detect the earlier hook and not apply a new one over again.
The default implementation of now(TimeUnit)
and Scheduler.Worker.now(TimeUnit)
methods to return current System.currentTimeMillis()
value in the desired time unit, unless rx2.scheduler.use-nanotime
(boolean) is set. When the property is set to
true
, the method uses System.nanoTime()
as its basis instead. Custom Scheduler
implementations can override this
to provide specialized time accounting (such as virtual time to be advanced programmatically).
Note that operators requiring a Scheduler
may rely on either of the now()
calls provided by
Scheduler
or Worker
respectively, therefore, it is recommended they represent a logically
consistent source of the current time.
The default implementation of the Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit)
method uses
the Scheduler.Worker.schedule(Runnable, long, TimeUnit)
for scheduling the Runnable
task periodically.
The algorithm calculates the next absolute time when the task should run again and schedules this execution
based on the relative time between it and Scheduler.Worker.now(TimeUnit)
. However, drifts or changes in the
system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart.
Therefore, the default implementation uses the clockDriftTolerance()
value (set via
rx2.scheduler.drift-tolerance
in minutes) to detect a drift in Scheduler.Worker.now(TimeUnit)
and
re-adjust the absolute/relative time calculation accordingly.
The default implementations of start()
and shutdown()
do nothing and should be overridden if the
underlying task-execution scheme supports stopping and restarting itself.
If the Scheduler
is shut down or a Worker
is disposed, the schedule
methods
should return the Disposables.disposed()
singleton instance indicating the shut down/disposed
state to the caller. Since the shutdown or dispose can happen from any thread, the schedule
implementations
should make best effort to cancel tasks immediately after those tasks have been submitted to the
underlying task-execution scheme if the shutdown/dispose was detected after this submission.
All methods on the Scheduler
and Worker
classes should be thread safe.
Modifier and Type | Class and Description |
---|---|
static class |
Scheduler.Worker
Represents an isolated, sequential worker of a parent Scheduler for executing
Runnable tasks on
an underlying task-execution scheme (such as custom Threads, event loop, Executor or Actor system). |
Constructor and Description |
---|
Scheduler() |
Modifier and Type | Method and Description |
---|---|
static long |
clockDriftTolerance()
Returns the clock drift tolerance in nanoseconds.
|
abstract Scheduler.Worker |
createWorker()
Retrieves or creates a new
Scheduler.Worker that represents sequential execution of actions. |
long |
now(TimeUnit unit)
Returns the 'current time' of the Scheduler in the specified time unit.
|
Disposable |
scheduleDirect(Runnable run)
Schedules the given task on this Scheduler without any time delay.
|
Disposable |
scheduleDirect(Runnable run,
long delay,
TimeUnit unit)
Schedules the execution of the given task with the given time delay.
|
Disposable |
schedulePeriodicallyDirect(Runnable run,
long initialDelay,
long period,
TimeUnit unit)
Schedules a periodic execution of the given task with the given initial time delay and repeat period.
|
void |
shutdown()
Instructs the Scheduler instance to stop threads,
stop accepting tasks on any outstanding
Scheduler.Worker instances
and clean up any associated resources with this Scheduler. |
void |
start()
Allows the Scheduler instance to start threads
and accept tasks on them.
|
<S extends Scheduler & Disposable> |
when(Function<Flowable<Flowable<Completable>>,Completable> combine)
Allows the use of operators for controlling the timing around when
actions scheduled on workers are actually done.
|
public static long clockDriftTolerance()
Related system property: rx2.scheduler.drift-tolerance
in minutes.
@NonNull public abstract Scheduler.Worker createWorker()
Scheduler.Worker
that represents sequential execution of actions.
When work is completed, the Worker
instance should be released
by calling Disposable.dispose()
to avoid potential resource leaks in the
underlying task-execution scheme.
Work on a Scheduler.Worker
is guaranteed to be sequential and non-overlapping.
public long now(@NonNull TimeUnit unit)
unit
- the time unitpublic void start()
Implementations should make sure the call is idempotent, thread-safe and
should not throw any RuntimeException
if it doesn't support this
functionality.
public void shutdown()
Scheduler.Worker
instances
and clean up any associated resources with this Scheduler.
Implementations should make sure the call is idempotent, thread-safe and
should not throw any RuntimeException
if it doesn't support this
functionality.
@NonNull public Disposable scheduleDirect(@NonNull Runnable run)
This method is safe to be called from multiple threads but there are no ordering or non-overlapping guarantees between tasks.
run
- the task to execute@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit)
This method is safe to be called from multiple threads but there are no ordering guarantees between tasks.
run
- the task to scheduledelay
- the delay amount, non-positive values indicate non-delayed schedulingunit
- the unit of measure of the delay amount@NonNull public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit)
This method is safe to be called from multiple threads but there are no ordering guarantees between tasks.
The periodic execution is at a fixed rate, that is, the first execution will be after the
initialDelay
, the second after initialDelay + period
, the third after
initialDelay + 2 * period
, and so on.
run
- the task to scheduleinitialDelay
- the initial delay amount, non-positive values indicate non-delayed schedulingperiod
- the period at which the task should be re-executedunit
- the unit of measure of the delay amount@NonNull public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>,Completable> combine)
Scheduler
. The only parameter
is a function that flattens an Flowable
of Flowable
of Completable
s into just one Completable
. There must be
a chain of operators connecting the returned value to the source
Flowable
otherwise any work scheduled on the returned
Scheduler
will not be executed.
When createWorker()
is invoked a Flowable
of
Completable
s is onNext'd to the combinator to be flattened. If
the inner Flowable
is not immediately subscribed to an calls to
Scheduler.Worker.schedule(java.lang.Runnable)
are buffered. Once the Flowable
is
subscribed to actions are then onNext'd as Completable
s.
Finally the actions scheduled on the parent Scheduler
when the
inner most Completable
s are subscribed to.
When the Scheduler.Worker
is unsubscribed the Completable
emits an
onComplete and triggers any behavior in the flattening operator. The
Flowable
and all Completable
s give to the flattening
function never onError.
Limit the amount concurrency two at a time without creating a new fix size thread pool:
Scheduler limitScheduler = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // callbacks two at a time return Completable.merge(Flowable.merge(workers), 2); });
This is a slightly different way to limit the concurrency but it has some
interesting benefits and drawbacks to the method above. It works by
limited the number of concurrent Scheduler.Worker
s rather than individual
actions. Generally each Flowable
uses its own Scheduler.Worker
.
This means that this will essentially limit the number of concurrent
subscribes. The danger comes from using operators like
Flowable.zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.functions.BiFunction)
where
subscribing to the first Flowable
could deadlock the
subscription to the second.
Scheduler limitScheduler = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // Flowables two at a time return Completable.merge(Flowable.merge(workers, 2)); });Slowing down the rate to no more than than 1 a second. This suffers from the same problem as the one above I could find an
Flowable
operator that limits the rate without dropping the values (aka leaky
bucket algorithm).
Scheduler slowScheduler = Schedulers.computation().when(workers -> { // use concatenate to make each worker happen one at a time. return Completable.concat(workers.map(actions -> { // delay the starting of the next worker by 1 second. return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS)); })); });
History: 2.0.1 - experimental
S
- a Scheduler and a Subscriptioncombine
- the function that takes a two-level nested Flowable sequence of a Completable and returns
the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.