Class Schedulers
Scheduler instances.
The initial and runtime values of the various scheduler types can be overridden via the
RxJavaPlugins.setInit(scheduler name)SchedulerHandler() and
RxJavaPlugins.set(scheduler name)SchedulerHandler() respectively.
Note that overriding any initial Scheduler via the RxJavaPlugins
has to happen before the Schedulers class is accessed.
Supported system properties (System.getProperty()):
rx3.io-keep-alive-time(long): sets the keep-alive time of theio()Schedulerworkers, default isIoScheduler.KEEP_ALIVE_TIME_DEFAULTrx3.io-priority(int): sets the thread priority of theio()Scheduler, default isThread.NORM_PRIORITYrx3.io-scheduled-release(boolean):truesets the worker release mode of theio()Schedulerto scheduled, default isfalsefor eager mode.rx3.computation-threads(int): sets the number of threads in thecomputation()Scheduler, default is the number of available CPUsrx3.computation-priority(int): sets the thread priority of thecomputation()Scheduler, default isThread.NORM_PRIORITYrx3.newthread-priority(int): sets the thread priority of thenewThread()Scheduler, default isThread.NORM_PRIORITYrx3.single-priority(int): sets the thread priority of thesingle()Scheduler, default isThread.NORM_PRIORITYrx3.purge-enabled(boolean): enables purging of allScheduler's backing thread pools, default istruerx3.scheduler.use-nanotime(boolean):trueinstructsSchedulerto useSystem.nanoTime()forScheduler.now(TimeUnit), instead of defaultSystem.currentTimeMillis()(false)
-
Method Summary
Modifier and TypeMethodDescriptionReturns a default, sharedSchedulerinstance intended for computational work.io()Returns a default, sharedSchedulerinstance intended for IO-bound work.static voidshutdown()Shuts down the standardSchedulers.single()Returns a default, shared, single-thread-backedSchedulerinstance for work requiring strongly-sequential execution on the same background thread.static voidstart()Starts the standardSchedulers.Returns a default, sharedSchedulerinstance whoseScheduler.Workerinstances queue work and execute them in a FIFO manner on one of the participating threads.
-
Method Details
-
computation
Returns a default, sharedSchedulerinstance intended for computational work.This can be used for event-loops, processing callbacks and other computational work.
It is not recommended to perform blocking, IO-bound work on this scheduler. Use
io()instead.The default instance has a backing pool of single-threaded
ScheduledExecutorServiceinstances equal to the number of available processors (Runtime.availableProcessors()) to the Java VM.Unhandled errors will be delivered to the scheduler Thread's
Thread.UncaughtExceptionHandler.This type of scheduler is less sensitive to leaking
Scheduler.Workerinstances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".If the
RxJavaPlugins.setFailOnNonBlockingScheduler(boolean)is set totrue, attempting to execute operators that block while running on this scheduler will throw anIllegalStateException.You can control certain properties of this standard scheduler via system properties that have to be set before the
Schedulersclass is referenced in your code.Supported system properties (
System.getProperty()):rx3.computation-threads(int): sets the number of threads in thecomputation()Scheduler, default is the number of available CPUsrx3.computation-priority(int): sets the thread priority of thecomputation()Scheduler, default isThread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitComputationSchedulerHandler(io.reactivex.rxjava4.functions.Function)plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in aNullPointerException. Once theSchedulersclass has been initialized, you can override the returnedSchedulerinstance via theRxJavaPlugins.setComputationSchedulerHandler(io.reactivex.rxjava4.functions.Function)method.It is possible to create a fresh instance of this scheduler with a custom
ThreadFactory, via theRxJavaPlugins.createComputationScheduler(ThreadFactory)method. Note that such custom instances require a manual call toScheduler.shutdown()to allow the JVM to exit or the (J2EE) container to unload properly.Operators on the base reactive classes that use this scheduler are marked with the @
SchedulerSupport(COMPUTATION) annotation.- Returns:
- a
Schedulermeant for computation-bound work
-
io
Returns a default, sharedSchedulerinstance intended for IO-bound work.This can be used for asynchronously performing blocking IO.
The implementation is backed by a pool of single-threaded
ScheduledExecutorServiceinstances that will try to reuse previously started instances used by the worker returned byScheduler.createWorker()but otherwise will start a new backingScheduledExecutorServiceinstance. Note that this scheduler may create an unbounded number of worker threads that can result in system slowdowns orOutOfMemoryError. Therefore, for casual uses or when implementing an operator, the Worker instances must be disposed viaDisposable.dispose().It is not recommended to perform computational work on this scheduler. Use
computation()instead.Unhandled errors will be delivered to the scheduler Thread's
Thread.UncaughtExceptionHandler.You can control certain properties of this standard scheduler via system properties that have to be set before the
Schedulersclass is referenced in your code.Supported system properties (
System.getProperty()):rx3.io-keep-alive-time(long): sets the keep-alive time of theio()Schedulerworkers, default isIoScheduler.KEEP_ALIVE_TIME_DEFAULTrx3.io-priority(int): sets the thread priority of theio()Scheduler, default isThread.NORM_PRIORITYrx3.io-scheduled-release(boolean):truesets the worker release mode of the#io()Schedulerto scheduled, default isfalsefor eager mode.
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitIoSchedulerHandler(io.reactivex.rxjava4.functions.Function)plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in aNullPointerException. Once theSchedulersclass has been initialized, you can override the returnedSchedulerinstance via theRxJavaPlugins.setIoSchedulerHandler(io.reactivex.rxjava4.functions.Function)method.It is possible to create a fresh instance of this scheduler with a custom
ThreadFactory, via theRxJavaPlugins.createIoScheduler(ThreadFactory)method. Note that such custom instances require a manual call toScheduler.shutdown()to allow the JVM to exit or the (J2EE) container to unload properly.Operators on the base reactive classes that use this scheduler are marked with the @
SchedulerSupport(IO) annotation.When the
Scheduler.Workeris disposed, the underlying worker can be released to the cached worker pool in two modes:- In eager mode (default), the underlying worker is returned immediately to the cached worker pool and can be reused much quicker by operators. The drawback is that if the currently running task doesn't respond to interruption in time or at all, this may lead to delays or deadlock with the reuse use of the underlying worker.
- In scheduled mode (enabled via the system parameter
rx3.io-scheduled-releaseset totrue), the underlying worker is returned to the cached worker pool only after the currently running task has finished. This can help prevent premature reuse of the underlying worker and likely won't lead to delays or deadlock with such reuses. The drawback is that the delay in release may lead to an excess amount of underlying workers being created.
- Returns:
- a
Schedulermeant for IO-bound work
-
trampoline
Returns a default, sharedSchedulerinstance whoseScheduler.Workerinstances queue work and execute them in a FIFO manner on one of the participating threads.The default implementation's
Scheduler.scheduleDirect(Runnable)methods execute the tasks on the current thread without any queueing and the timed overloads use blocking sleep as well.Note that this scheduler can't be reliably used to return the execution of tasks to the "main" thread. Such behavior requires a blocking-queueing scheduler currently not provided by RxJava itself but may be found in external libraries.
This scheduler can't be overridden via an
RxJavaPluginsmethod.- Returns:
- a
Schedulerthat queues work on the current thread
-
newThread
Returns a default, sharedSchedulerinstance that creates a newThreadfor each unit of work.The default implementation of this scheduler creates a new, single-threaded
ScheduledExecutorServicefor each invocation of theScheduler.scheduleDirect(Runnable)(plus its overloads) andScheduler.createWorker()methods, thus an unbounded number of worker threads may be created that can result in system slowdowns orOutOfMemoryError. Therefore, for casual uses or when implementing an operator, the Worker instances must be disposed viaDisposable.dispose().Unhandled errors will be delivered to the scheduler Thread's
Thread.UncaughtExceptionHandler.You can control certain properties of this standard scheduler via system properties that have to be set before the
Schedulersclass is referenced in your code.Supported system properties (
System.getProperty()):rx3.newthread-priority(int): sets the thread priority of thenewThread()Scheduler, default isThread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitNewThreadSchedulerHandler(io.reactivex.rxjava4.functions.Function)plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in aNullPointerException. Once theSchedulersclass has been initialized, you can override the returnedSchedulerinstance via theRxJavaPlugins.setNewThreadSchedulerHandler(io.reactivex.rxjava4.functions.Function)method.It is possible to create a fresh instance of this scheduler with a custom
ThreadFactory, via theRxJavaPlugins.createNewThreadScheduler(ThreadFactory)method. Note that such custom instances require a manual call toScheduler.shutdown()to allow the JVM to exit or the (J2EE) container to unload properly.Operators on the base reactive classes that use this scheduler are marked with the @
SchedulerSupport(NEW_TRHEAD) annotation.- Returns:
- a
Schedulerthat creates new threads
-
single
Returns a default, shared, single-thread-backedSchedulerinstance for work requiring strongly-sequential execution on the same background thread.Uses:
- event loop
- support
Schedulers.from(Executor)andfrom(ExecutorService)with delayed scheduling - support benchmarks that pipeline data from some thread to another thread and avoid core-bashing of computation's round-robin nature
Unhandled errors will be delivered to the scheduler Thread's
Thread.UncaughtExceptionHandler.This type of scheduler is less sensitive to leaking
Scheduler.Workerinstances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".If the
RxJavaPlugins.setFailOnNonBlockingScheduler(boolean)is set totrue, attempting to execute operators that block while running on this scheduler will throw anIllegalStateException.You can control certain properties of this standard scheduler via system properties that have to be set before the
Schedulersclass is referenced in your code.Supported system properties (
System.getProperty()):rx3.single-priority(int): sets the thread priority of thesingle()Scheduler, default isThread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitSingleSchedulerHandler(io.reactivex.rxjava4.functions.Function)plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in aNullPointerException. Once theSchedulersclass has been initialized, you can override the returnedSchedulerinstance via theRxJavaPlugins.setSingleSchedulerHandler(io.reactivex.rxjava4.functions.Function)method.It is possible to create a fresh instance of this scheduler with a custom
ThreadFactory, via theRxJavaPlugins.createSingleScheduler(ThreadFactory)method. Note that such custom instances require a manual call toScheduler.shutdown()to allow the JVM to exit or the (J2EE) container to unload properly.Operators on the base reactive classes that use this scheduler are marked with the @
SchedulerSupport(SINGLE) annotation.- Returns:
- a
Schedulerthat shares a single backing thread. - Since:
- 2.0
-
from
Wraps anExecutorinto a newSchedulerinstance and delegatesschedule()calls to it.If the provided executor doesn't support any of the more specific standard Java executor APIs, tasks scheduled by this scheduler can't be interrupted when they are executing but only prevented from running prior to that. In addition, tasks scheduled with a time delay or periodically will use the
single()scheduler for the timed waiting before posting the actual task to the given executor.Tasks submitted to the
Scheduler.Workerof thisSchedulerare also not interruptible. Use thefrom(Executor, boolean)overload to enable task interruption via this wrapper.If the provided executor supports the standard Java
ExecutorServiceAPI, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use thesingle()scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ScheduledExecutorServiceAPI, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use the provided executor. Note, however, if the providedScheduledExecutorServiceinstance is not single threaded, tasks scheduled with a time delay close to each other may end up executing in different order than the original schedule() call was issued. This limitation may be lifted in a future patch.The implementation of the Worker of this wrapper
Scheduleris eager and will execute as many non-delayed tasks as it can, which may result in a longer than expected occupation of a thread of the given backingExecutor. In other terms, it does not allow per-Runnablefairness in case the worker runs on a shared underlying thread of theExecutor. Seefrom(Executor, boolean, boolean)to create a wrapper that uses the underlyingExecutormore fairly.Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor(); try { Scheduler scheduler = Schedulers.from(exec); Flowable.just(1) .subscribeOn(scheduler) .map(v -> v + 1) .observeOn(scheduler) .blockingSubscribe(System.out::println); } finally { exec.shutdown(); }Note that the provided
Executorshould avoid throwing aRejectedExecutionException(for example, by shutting it down prematurely or using a bounded-queueExecutorService) because such circumstances prevent RxJava from progressing flow-related activities correctly. If theExecutor.execute(Runnable)orExecutorService.submit(Callable)throws, theRejectedExecutionExceptionis routed to the global error handler viaRxJavaPlugins.onError(Throwable). To avoid shutdown-related problems, it is recommended all flows using the returnedSchedulerto be canceled/disposed before the underlyingExecutoris shut down. To avoid problems due to theExecutorhaving a bounded-queue, it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.This type of scheduler is less sensitive to leaking
Scheduler.Workerinstances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".Note that this method returns a new
Schedulerinstance, even for the sameExecutorinstance.It is possible to wrap an
Executorinto aSchedulerwithout triggering the initialization of all the standard schedulers by using theRxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)method before theSchedulersclass itself is accessed.- Parameters:
executor- the executor to wrap- Returns:
- the new
Schedulerwrapping theExecutor - See Also:
-
from
@NonNull public static @NonNull Scheduler from(@NonNull @NonNull Executor executor, boolean interruptibleWorker) Wraps anExecutorinto a newSchedulerinstance and delegatesschedule()calls to it.The tasks scheduled by the returned
Schedulerand itsScheduler.Workercan be optionally interrupted.If the provided executor doesn't support any of the more specific standard Java executor APIs, tasks scheduled with a time delay or periodically will use the
single()scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ExecutorServiceAPI, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use thesingle()scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ScheduledExecutorServiceAPI, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use the provided executor. Note, however, if the providedScheduledExecutorServiceinstance is not single threaded, tasks scheduled with a time delay close to each other may end up executing in different order than the original schedule() call was issued. This limitation may be lifted in a future patch.The implementation of the
Workerof this wrapperScheduleris eager and will execute as many non-delayed tasks as it can, which may result in a longer than expected occupation of a thread of the given backingExecutor. In other terms, it does not allow per-Runnablefairness in case the worker runs on a shared underlying thread of theExecutor. Seefrom(Executor, boolean, boolean)to create a wrapper that uses the underlyingExecutormore fairly.Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor(); try { Scheduler scheduler = Schedulers.from(exec, true); Flowable.just(1) .subscribeOn(scheduler) .map(v -> v + 1) .observeOn(scheduler) .blockingSubscribe(System.out::println); } finally { exec.shutdown(); }Note that the provided
Executorshould avoid throwing aRejectedExecutionException(for example, by shutting it down prematurely or using a bounded-queueExecutorService) because such circumstances prevent RxJava from progressing flow-related activities correctly. If theExecutor.execute(Runnable)orExecutorService.submit(Callable)throws, theRejectedExecutionExceptionis routed to the global error handler viaRxJavaPlugins.onError(Throwable). To avoid shutdown-related problems, it is recommended all flows using the returnedSchedulerto be canceled/disposed before the underlyingExecutoris shut down. To avoid problems due to theExecutorhaving a bounded-queue, it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.This type of scheduler is less sensitive to leaking
Scheduler.Workerinstances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".Note that this method returns a new
Schedulerinstance, even for the sameExecutorinstance.It is possible to wrap an
Executorinto aSchedulerwithout triggering the initialization of all the standard schedulers by using theRxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)method before theSchedulersclass itself is accessed.History: 2.2.6 - experimental
- Parameters:
executor- the executor to wrapinterruptibleWorker- iftrue, the tasks submitted to theScheduler.Workerwill be interrupted when the task is disposed.- Returns:
- the new
Schedulerwrapping theExecutor - Since:
- 3.0.0
- See Also:
-
from
@NonNull public static @NonNull Scheduler from(@NonNull @NonNull Executor executor, boolean interruptibleWorker, boolean fair) Wraps anExecutorinto a newSchedulerinstance and delegatesschedule()calls to it.The tasks scheduled by the returned
Schedulerand itsScheduler.Workercan be optionally interrupted.If the provided executor doesn't support any of the more specific standard Java executor APIs, tasks scheduled with a time delay or periodically will use the
single()scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ExecutorServiceAPI, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use thesingle()scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ScheduledExecutorServiceAPI, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use the provided executor. Note, however, if the providedScheduledExecutorServiceinstance is not single threaded, tasks scheduled with a time delay close to each other may end up executing in different order than the original schedule() call was issued. This limitation may be lifted in a future patch.The implementation of the Worker of this wrapper
Schedulercan operate in both eager (non-fair) and fair modes depending on the specified parameter. In eager mode, it will execute as many non-delayed tasks as it can, which may result in a longer than expected occupation of a thread of the given backingExecutor. In other terms, it does not allow per-Runnablefairness in case the worker runs on a shared underlying thread of theExecutor. In fair mode, non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task, the execution for the next task is rescheduled with the same underlyingExecutor, allowing interleaving from both the sameScheduleror other external usages of the underlyingExecutor.Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor(); try { Scheduler scheduler = Schedulers.from(exec, true, true); Flowable.just(1) .subscribeOn(scheduler) .map(v -> v + 1) .observeOn(scheduler) .blockingSubscribe(System.out::println); } finally { exec.shutdown(); }Note that the provided
Executorshould avoid throwing aRejectedExecutionException(for example, by shutting it down prematurely or using a bounded-queueExecutorService) because such circumstances prevent RxJava from progressing flow-related activities correctly. If theExecutor.execute(Runnable)orExecutorService.submit(Callable)throws, theRejectedExecutionExceptionis routed to the global error handler viaRxJavaPlugins.onError(Throwable). To avoid shutdown-related problems, it is recommended all flows using the returnedSchedulerto be canceled/disposed before the underlyingExecutoris shut down. To avoid problems due to theExecutorhaving a bounded-queue, it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.This type of scheduler is less sensitive to leaking
Scheduler.Workerinstances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".Note that this method returns a new
Schedulerinstance, even for the sameExecutorinstance.It is possible to wrap an
Executorinto aSchedulerwithout triggering the initialization of all the standard schedulers by using theRxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)method before theSchedulersclass itself is accessed.- Parameters:
executor- the executor to wrapinterruptibleWorker- iftrue, the tasks submitted to theScheduler.Workerwill be interrupted when the task is disposed.fair- iftrue, tasks submitted to theSchedulerorWorkerwill be executed by the underlyingExecutorone after the other, still in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlyingExecutor. Iffalse, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlyingExecutorthread.- Returns:
- the new
Schedulerwrapping theExecutor - Since:
- 3.0.0
-
shutdown
public static void shutdown()Shuts down the standardSchedulers.The operation is idempotent and thread-safe.
-
start
public static void start()Starts the standardSchedulers.The operation is idempotent and thread-safe.
-