public final class Schedulers extends Object
Scheduler
instances.
The initial and runtime values of the various scheduler types can be overridden via the
RxJavaPlugins.setInit(scheduler name)SchedulerHandler()
and
RxJavaPlugins.set(scheduler name)SchedulerHandler()
respectively.
Note that overriding any initial Scheduler
via the RxJavaPlugins
has to happen before the Schedulers
class is accessed.
Supported system properties (System.getProperty()
):
rx3.io-keep-alive-time
(long): sets the keep-alive time of the io()
Scheduler
workers, default is IoScheduler.KEEP_ALIVE_TIME_DEFAULT
rx3.io-priority
(int): sets the thread priority of the io()
Scheduler
, default is Thread.NORM_PRIORITY
rx3.io-scheduled-release
(boolean): true
sets the worker release mode of the
io()
Scheduler
to scheduled, default is false
for eager mode.rx3.computation-threads
(int): sets the number of threads in the computation()
Scheduler
, default is the number of available CPUsrx3.computation-priority
(int): sets the thread priority of the computation()
Scheduler
, default is Thread.NORM_PRIORITY
rx3.newthread-priority
(int): sets the thread priority of the newThread()
Scheduler
, default is Thread.NORM_PRIORITY
rx3.single-priority
(int): sets the thread priority of the single()
Scheduler
, default is Thread.NORM_PRIORITY
rx3.purge-enabled
(boolean): enables purging of all Scheduler
's backing thread pools, default is true
rx3.scheduler.use-nanotime
(boolean): true
instructs Scheduler
to use System.nanoTime()
for Scheduler.now(TimeUnit)
,
instead of default System.currentTimeMillis()
(false
)Modifier and Type | Method and Description |
---|---|
static @NonNull Scheduler |
computation()
Returns a default, shared
Scheduler instance intended for computational work. |
static @NonNull Scheduler |
from(@NonNull Executor executor)
|
static @NonNull Scheduler |
from(@NonNull Executor executor,
boolean interruptibleWorker)
|
static @NonNull Scheduler |
from(@NonNull Executor executor,
boolean interruptibleWorker,
boolean fair)
|
static @NonNull Scheduler |
io()
Returns a default, shared
Scheduler instance intended for IO-bound work. |
static @NonNull Scheduler |
newThread()
|
static void |
shutdown()
Shuts down the standard
Scheduler s. |
static @NonNull Scheduler |
single()
Returns a default, shared, single-thread-backed
Scheduler instance for work
requiring strongly-sequential execution on the same background thread. |
static void |
start()
Starts the standard
Scheduler s. |
static @NonNull Scheduler |
trampoline()
Returns a default, shared
Scheduler instance whose Scheduler.Worker
instances queue work and execute them in a FIFO manner on one of the participating threads. |
@NonNull public static @NonNull Scheduler computation()
Scheduler
instance intended for computational work.
This can be used for event-loops, processing callbacks and other computational work.
It is not recommended to perform blocking, IO-bound work on this scheduler. Use io()
instead.
The default instance has a backing pool of single-threaded ScheduledExecutorService
instances equal to
the number of available processors (Runtime.availableProcessors()
) to the Java VM.
Unhandled errors will be delivered to the scheduler Thread's Thread.UncaughtExceptionHandler
.
This type of scheduler is less sensitive to leaking Scheduler.Worker
instances, although
not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
execute those tasks "unexpectedly".
If the RxJavaPlugins.setFailOnNonBlockingScheduler(boolean)
is set to true
, attempting to execute
operators that block while running on this scheduler will throw an IllegalStateException
.
You can control certain properties of this standard scheduler via system properties that have to be set
before the Schedulers
class is referenced in your code.
Supported system properties (System.getProperty()
):
rx3.computation-threads
(int): sets the number of threads in the computation()
Scheduler
, default is the number of available CPUsrx3.computation-priority
(int): sets the thread priority of the computation()
Scheduler
, default is Thread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitComputationSchedulerHandler(io.reactivex.rxjava3.functions.Function)
plugin method.
Note that due to possible initialization cycles, using any of the other scheduler-returning methods will
result in a NullPointerException
.
Once the Schedulers
class has been initialized, you can override the returned Scheduler
instance
via the RxJavaPlugins.setComputationSchedulerHandler(io.reactivex.rxjava3.functions.Function)
method.
It is possible to create a fresh instance of this scheduler with a custom ThreadFactory
, via the
RxJavaPlugins.createComputationScheduler(ThreadFactory)
method. Note that such custom
instances require a manual call to Scheduler.shutdown()
to allow the JVM to exit or the
(J2EE) container to unload properly.
Operators on the base reactive classes that use this scheduler are marked with the
@SchedulerSupport
(COMPUTATION
)
annotation.
Scheduler
meant for computation-bound work@NonNull public static @NonNull Scheduler io()
Scheduler
instance intended for IO-bound work.
This can be used for asynchronously performing blocking IO.
The implementation is backed by a pool of single-threaded ScheduledExecutorService
instances
that will try to reuse previously started instances used by the worker
returned by Scheduler.createWorker()
but otherwise will start a new backing
ScheduledExecutorService
instance. Note that this scheduler may create an unbounded number
of worker threads that can result in system slowdowns or OutOfMemoryError
. Therefore, for casual uses
or when implementing an operator, the Worker instances must be disposed via Disposable.dispose()
.
It is not recommended to perform computational work on this scheduler. Use computation()
instead.
Unhandled errors will be delivered to the scheduler Thread's Thread.UncaughtExceptionHandler
.
You can control certain properties of this standard scheduler via system properties that have to be set
before the Schedulers
class is referenced in your code.
Supported system properties (System.getProperty()
):
rx3.io-keep-alive-time
(long): sets the keep-alive time of the io()
Scheduler
workers, default is IoScheduler.KEEP_ALIVE_TIME_DEFAULT
rx3.io-priority
(int): sets the thread priority of the io()
Scheduler
, default is Thread.NORM_PRIORITY
rx3.io-scheduled-release
(boolean): true
sets the worker release mode of the
#io()
Scheduler
to scheduled, default is false
for eager mode.
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitIoSchedulerHandler(io.reactivex.rxjava3.functions.Function)
plugin method.
Note that due to possible initialization cycles, using any of the other scheduler-returning methods will
result in a NullPointerException
.
Once the Schedulers
class has been initialized, you can override the returned Scheduler
instance
via the RxJavaPlugins.setIoSchedulerHandler(io.reactivex.rxjava3.functions.Function)
method.
It is possible to create a fresh instance of this scheduler with a custom ThreadFactory
, via the
RxJavaPlugins.createIoScheduler(ThreadFactory)
method. Note that such custom
instances require a manual call to Scheduler.shutdown()
to allow the JVM to exit or the
(J2EE) container to unload properly.
Operators on the base reactive classes that use this scheduler are marked with the
@SchedulerSupport
(IO
)
annotation.
When the Scheduler.Worker
is disposed,
the underlying worker can be released to the cached worker pool in two modes:
rx3.io-scheduled-release
set to true
), the underlying worker is returned to the cached worker pool only after the currently running task
has finished. This can help prevent premature reuse of the underlying worker and likely won't lead to delays or
deadlock with such reuses. The drawback is that the delay in release may lead to an excess amount of underlying
workers being created.
Scheduler
meant for IO-bound work@NonNull public static @NonNull Scheduler trampoline()
Scheduler
instance whose Scheduler.Worker
instances queue work and execute them in a FIFO manner on one of the participating threads.
The default implementation's Scheduler.scheduleDirect(Runnable)
methods execute the tasks on the current thread
without any queueing and the timed overloads use blocking sleep as well.
Note that this scheduler can't be reliably used to return the execution of tasks to the "main" thread. Such behavior requires a blocking-queueing scheduler currently not provided by RxJava itself but may be found in external libraries.
This scheduler can't be overridden via an RxJavaPlugins
method.
Scheduler
that queues work on the current thread@NonNull public static @NonNull Scheduler newThread()
Scheduler
instance that creates a new Thread
for each unit of work.
The default implementation of this scheduler creates a new, single-threaded ScheduledExecutorService
for
each invocation of the Scheduler.scheduleDirect(Runnable)
(plus its overloads) and Scheduler.createWorker()
methods, thus an unbounded number of worker threads may be created that can
result in system slowdowns or OutOfMemoryError
. Therefore, for casual uses or when implementing an operator,
the Worker instances must be disposed via Disposable.dispose()
.
Unhandled errors will be delivered to the scheduler Thread's Thread.UncaughtExceptionHandler
.
You can control certain properties of this standard scheduler via system properties that have to be set
before the Schedulers
class is referenced in your code.
Supported system properties (System.getProperty()
):
rx3.newthread-priority
(int): sets the thread priority of the newThread()
Scheduler
, default is Thread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitNewThreadSchedulerHandler(io.reactivex.rxjava3.functions.Function)
plugin method.
Note that due to possible initialization cycles, using any of the other scheduler-returning methods will
result in a NullPointerException
.
Once the Schedulers
class has been initialized, you can override the returned Scheduler
instance
via the RxJavaPlugins.setNewThreadSchedulerHandler(io.reactivex.rxjava3.functions.Function)
method.
It is possible to create a fresh instance of this scheduler with a custom ThreadFactory
, via the
RxJavaPlugins.createNewThreadScheduler(ThreadFactory)
method. Note that such custom
instances require a manual call to Scheduler.shutdown()
to allow the JVM to exit or the
(J2EE) container to unload properly.
Operators on the base reactive classes that use this scheduler are marked with the
@SchedulerSupport
(NEW_TRHEAD
)
annotation.
Scheduler
that creates new threads@NonNull public static @NonNull Scheduler single()
Scheduler
instance for work
requiring strongly-sequential execution on the same background thread.
Uses:
Schedulers.from(
Executor
)
and from(
ExecutorService
)
with delayed scheduling
Unhandled errors will be delivered to the scheduler Thread's Thread.UncaughtExceptionHandler
.
This type of scheduler is less sensitive to leaking Scheduler.Worker
instances, although
not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
execute those tasks "unexpectedly".
If the RxJavaPlugins.setFailOnNonBlockingScheduler(boolean)
is set to true
, attempting to execute
operators that block while running on this scheduler will throw an IllegalStateException
.
You can control certain properties of this standard scheduler via system properties that have to be set
before the Schedulers
class is referenced in your code.
Supported system properties (System.getProperty()
):
rx3.single-priority
(int): sets the thread priority of the single()
Scheduler
, default is Thread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitSingleSchedulerHandler(io.reactivex.rxjava3.functions.Function)
plugin method.
Note that due to possible initialization cycles, using any of the other scheduler-returning methods will
result in a NullPointerException
.
Once the Schedulers
class has been initialized, you can override the returned Scheduler
instance
via the RxJavaPlugins.setSingleSchedulerHandler(io.reactivex.rxjava3.functions.Function)
method.
It is possible to create a fresh instance of this scheduler with a custom ThreadFactory
, via the
RxJavaPlugins.createSingleScheduler(ThreadFactory)
method. Note that such custom
instances require a manual call to Scheduler.shutdown()
to allow the JVM to exit or the
(J2EE) container to unload properly.
Operators on the base reactive classes that use this scheduler are marked with the
@SchedulerSupport
(SINGLE
)
annotation.
Scheduler
that shares a single backing thread.@NonNull public static @NonNull Scheduler from(@NonNull @NonNull Executor executor)
Executor
into a new Scheduler
instance and delegates schedule()
calls to it.
If the provided executor doesn't support any of the more specific standard Java executor
APIs, tasks scheduled by this scheduler can't be interrupted when they are
executing but only prevented from running prior to that. In addition, tasks scheduled with
a time delay or periodically will use the single()
scheduler for the timed waiting
before posting the actual task to the given executor.
Tasks submitted to the Scheduler.Worker
of this Scheduler
are also not interruptible. Use the
from(Executor, boolean)
overload to enable task interruption via this wrapper.
If the provided executor supports the standard Java ExecutorService
API,
tasks scheduled by this scheduler can be cancelled/interrupted by calling
Disposable.dispose()
. In addition, tasks scheduled with
a time delay or periodically will use the single()
scheduler for the timed waiting
before posting the actual task to the given executor.
If the provided executor supports the standard Java ScheduledExecutorService
API,
tasks scheduled by this scheduler can be cancelled/interrupted by calling
Disposable.dispose()
. In addition, tasks scheduled with
a time delay or periodically will use the provided executor. Note, however, if the provided
ScheduledExecutorService
instance is not single threaded, tasks scheduled
with a time delay close to each other may end up executing in different order than
the original schedule() call was issued. This limitation may be lifted in a future patch.
The implementation of the Worker of this wrapper Scheduler
is eager and will execute as many
non-delayed tasks as it can, which may result in a longer than expected occupation of a
thread of the given backing Executor
. In other terms, it does not allow per-Runnable
fairness
in case the worker runs on a shared underlying thread of the Executor
.
See from(Executor, boolean, boolean)
to create a wrapper that uses the underlying Executor
more fairly.
Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor();
try {
Scheduler scheduler = Schedulers.from(exec);
Flowable.just(1)
.subscribeOn(scheduler)
.map(v -> v + 1)
.observeOn(scheduler)
.blockingSubscribe(System.out::println);
} finally {
exec.shutdown();
}
Note that the provided Executor
should avoid throwing a RejectedExecutionException
(for example, by shutting it down prematurely or using a bounded-queue ExecutorService
)
because such circumstances prevent RxJava from progressing flow-related activities correctly.
If the Executor.execute(Runnable)
or ExecutorService.submit(Callable)
throws,
the RejectedExecutionException
is routed to the global error handler via
RxJavaPlugins.onError(Throwable)
. To avoid shutdown-related problems, it is recommended
all flows using the returned Scheduler
to be canceled/disposed before the underlying
Executor
is shut down. To avoid problems due to the Executor
having a bounded-queue,
it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.
This type of scheduler is less sensitive to leaking Scheduler.Worker
instances, although
not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
execute those tasks "unexpectedly".
Note that this method returns a new Scheduler
instance, even for the same Executor
instance.
It is possible to wrap an Executor
into a Scheduler
without triggering the initialization of all the
standard schedulers by using the RxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)
method
before the Schedulers
class itself is accessed.
executor
- the executor to wrapScheduler
wrapping the Executor
from(Executor, boolean, boolean)
@NonNull public static @NonNull Scheduler from(@NonNull @NonNull Executor executor, boolean interruptibleWorker)
Executor
into a new Scheduler
instance and delegates schedule()
calls to it.
The tasks scheduled by the returned Scheduler
and its Scheduler.Worker
can be optionally interrupted.
If the provided executor doesn't support any of the more specific standard Java executor
APIs, tasks scheduled with a time delay or periodically will use the
single()
scheduler for the timed waiting
before posting the actual task to the given executor.
If the provided executor supports the standard Java ExecutorService
API,
tasks scheduled by this scheduler can be cancelled/interrupted by calling
Disposable.dispose()
. In addition, tasks scheduled with
a time delay or periodically will use the single()
scheduler for the timed waiting
before posting the actual task to the given executor.
If the provided executor supports the standard Java ScheduledExecutorService
API,
tasks scheduled by this scheduler can be cancelled/interrupted by calling
Disposable.dispose()
. In addition, tasks scheduled with
a time delay or periodically will use the provided executor. Note, however, if the provided
ScheduledExecutorService
instance is not single threaded, tasks scheduled
with a time delay close to each other may end up executing in different order than
the original schedule() call was issued. This limitation may be lifted in a future patch.
The implementation of the Worker
of this wrapper Scheduler
is eager and will execute as many
non-delayed tasks as it can, which may result in a longer than expected occupation of a
thread of the given backing Executor
. In other terms, it does not allow per-Runnable
fairness
in case the worker runs on a shared underlying thread of the Executor
.
See from(Executor, boolean, boolean)
to create a wrapper that uses the underlying Executor
more fairly.
Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor();
try {
Scheduler scheduler = Schedulers.from(exec, true);
Flowable.just(1)
.subscribeOn(scheduler)
.map(v -> v + 1)
.observeOn(scheduler)
.blockingSubscribe(System.out::println);
} finally {
exec.shutdown();
}
Note that the provided Executor
should avoid throwing a RejectedExecutionException
(for example, by shutting it down prematurely or using a bounded-queue ExecutorService
)
because such circumstances prevent RxJava from progressing flow-related activities correctly.
If the Executor.execute(Runnable)
or ExecutorService.submit(Callable)
throws,
the RejectedExecutionException
is routed to the global error handler via
RxJavaPlugins.onError(Throwable)
. To avoid shutdown-related problems, it is recommended
all flows using the returned Scheduler
to be canceled/disposed before the underlying
Executor
is shut down. To avoid problems due to the Executor
having a bounded-queue,
it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.
This type of scheduler is less sensitive to leaking Scheduler.Worker
instances, although
not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
execute those tasks "unexpectedly".
Note that this method returns a new Scheduler
instance, even for the same Executor
instance.
It is possible to wrap an Executor
into a Scheduler
without triggering the initialization of all the
standard schedulers by using the RxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)
method
before the Schedulers
class itself is accessed.
History: 2.2.6 - experimental
executor
- the executor to wrapinterruptibleWorker
- if true
, the tasks submitted to the Scheduler.Worker
will
be interrupted when the task is disposed.Scheduler
wrapping the Executor
from(Executor, boolean, boolean)
@NonNull public static @NonNull Scheduler from(@NonNull @NonNull Executor executor, boolean interruptibleWorker, boolean fair)
Executor
into a new Scheduler
instance and delegates schedule()
calls to it.
The tasks scheduled by the returned Scheduler
and its Scheduler.Worker
can be optionally interrupted.
If the provided executor doesn't support any of the more specific standard Java executor
APIs, tasks scheduled with a time delay or periodically will use the
single()
scheduler for the timed waiting
before posting the actual task to the given executor.
If the provided executor supports the standard Java ExecutorService
API,
tasks scheduled by this scheduler can be cancelled/interrupted by calling
Disposable.dispose()
. In addition, tasks scheduled with
a time delay or periodically will use the single()
scheduler for the timed waiting
before posting the actual task to the given executor.
If the provided executor supports the standard Java ScheduledExecutorService
API,
tasks scheduled by this scheduler can be cancelled/interrupted by calling
Disposable.dispose()
. In addition, tasks scheduled with
a time delay or periodically will use the provided executor. Note, however, if the provided
ScheduledExecutorService
instance is not single threaded, tasks scheduled
with a time delay close to each other may end up executing in different order than
the original schedule() call was issued. This limitation may be lifted in a future patch.
The implementation of the Worker of this wrapper Scheduler
can operate in both eager (non-fair) and
fair modes depending on the specified parameter. In eager mode, it will execute as many
non-delayed tasks as it can, which may result in a longer than expected occupation of a
thread of the given backing Executor
. In other terms, it does not allow per-Runnable
fairness
in case the worker runs on a shared underlying thread of the Executor
. In fair mode,
non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task,
the execution for the next task is rescheduled with the same underlying Executor
, allowing interleaving
from both the same Scheduler
or other external usages of the underlying Executor
.
Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor();
try {
Scheduler scheduler = Schedulers.from(exec, true, true);
Flowable.just(1)
.subscribeOn(scheduler)
.map(v -> v + 1)
.observeOn(scheduler)
.blockingSubscribe(System.out::println);
} finally {
exec.shutdown();
}
Note that the provided Executor
should avoid throwing a RejectedExecutionException
(for example, by shutting it down prematurely or using a bounded-queue ExecutorService
)
because such circumstances prevent RxJava from progressing flow-related activities correctly.
If the Executor.execute(Runnable)
or ExecutorService.submit(Callable)
throws,
the RejectedExecutionException
is routed to the global error handler via
RxJavaPlugins.onError(Throwable)
. To avoid shutdown-related problems, it is recommended
all flows using the returned Scheduler
to be canceled/disposed before the underlying
Executor
is shut down. To avoid problems due to the Executor
having a bounded-queue,
it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.
This type of scheduler is less sensitive to leaking Scheduler.Worker
instances, although
not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
execute those tasks "unexpectedly".
Note that this method returns a new Scheduler
instance, even for the same Executor
instance.
It is possible to wrap an Executor
into a Scheduler
without triggering the initialization of all the
standard schedulers by using the RxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)
method
before the Schedulers
class itself is accessed.
executor
- the executor to wrapinterruptibleWorker
- if true
, the tasks submitted to the Scheduler.Worker
will
be interrupted when the task is disposed.fair
- if true
, tasks submitted to the Scheduler
or Worker
will be executed by the underlying Executor
one after the other, still
in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlying Executor
.
If false
, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlying Executor
thread.Scheduler
wrapping the Executor
public static void shutdown()
Scheduler
s.
The operation is idempotent and thread-safe.
public static void start()
Scheduler
s.
The operation is idempotent and thread-safe.