public abstract class Completable extends Object implements CompletableSource
Completable
class represents a deferred computation without any value but
only indication for completion or exception.
Completable
behaves similarly to Observable
except that it can only emit either
a completion or error signal (there is no onNext
or onSuccess
as with the other
reactive types).
The Completable
class implements the CompletableSource
base interface and the default consumer
type it interacts with is the CompletableObserver
via the subscribe(CompletableObserver)
method.
The Completable
operates with the following sequential protocol:
onSubscribe (onError | onComplete)?
Note that as with the Observable
protocol, onError
and onComplete
are mutually exclusive events.
Like Observable
, a running Completable
can be stopped through the Disposable
instance
provided to consumers through SingleObserver.onSubscribe(io.reactivex.rxjava3.disposables.Disposable)
.
Like an Observable
, a Completable
is lazy, can be either "hot" or "cold", synchronous or
asynchronous. Completable
instances returned by the methods of this class are cold
and there is a standard hot implementation in the form of a subject:
CompletableSubject
.
The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
See Flowable
or Observable
for the
implementation of the Reactive Pattern for a stream or vector of values.
Example:
Disposable d = Completable.complete()
.delay(10, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(5000);
d.dispose();
Note that by design, subscriptions via subscribe(CompletableObserver)
can't be disposed
from the outside (hence the
void
return of the subscribe(CompletableObserver)
method) and it is the
responsibility of the implementor of the CompletableObserver
to allow this to happen.
RxJava supports such usage with the standard
DisposableCompletableObserver
instance.
For convenience, the subscribeWith(CompletableObserver)
method is provided as well to
allow working with a CompletableObserver
(or subclass) instance to be applied with in
a fluent manner (such as in the example above).
DisposableCompletableObserver
Constructor and Description |
---|
Completable() |
Modifier and Type | Method and Description |
---|---|
static @NonNull Completable |
amb(@NonNull Iterable<? extends CompletableSource> sources)
Returns a Completable which terminates as soon as one of the source Completables
terminates (normally or with an error) and disposes all other Completables.
|
static @NonNull Completable |
ambArray(CompletableSource... sources)
Returns a Completable which terminates as soon as one of the source Completables
terminates (normally or with an error) and disposes all other Completables.
|
@NonNull Completable |
ambWith(@NonNull CompletableSource other)
Returns a Completable that emits the a terminated event of either this Completable
or the other Completable whichever fires first.
|
@NonNull Completable |
andThen(@NonNull CompletableSource next)
Returns a Completable that first runs this Completable
and then the other completable.
|
<T> @NonNull Maybe<T> |
andThen(@NonNull MaybeSource<T> next)
Returns a
Maybe which will subscribe to this Completable and once that is completed then
will subscribe to the next MaybeSource. |
<T> @NonNull Observable<T> |
andThen(@NonNull ObservableSource<T> next)
Returns an Observable which will subscribe to this Completable and once that is completed then
will subscribe to the
next ObservableSource. |
<T> @NonNull Flowable<T> |
andThen(@NonNull Publisher<T> next)
Returns a Flowable which will subscribe to this Completable and once that is completed then
will subscribe to the
next Flowable. |
<T> @NonNull Single<T> |
andThen(@NonNull SingleSource<T> next)
Returns a Single which will subscribe to this Completable and once that is completed then
will subscribe to the
next SingleSource. |
void |
blockingAwait()
Subscribes to and awaits the termination of this Completable instance in a blocking manner and
rethrows any exception emitted.
|
boolean |
blockingAwait(long timeout,
@NonNull TimeUnit unit)
Subscribes to and awaits the termination of this Completable instance in a blocking manner
with a specific timeout and rethrows any exception emitted within the timeout window.
|
@NonNull Completable |
cache()
Subscribes to this Completable only once, when the first CompletableObserver
subscribes to the result Completable, caches its terminal event
and relays/replays it to observers.
|
static @NonNull Completable |
complete()
Returns a Completable instance that completes immediately when subscribed to.
|
@NonNull Completable |
compose(@NonNull CompletableTransformer transformer)
Calls the given transformer function with this instance and returns the function's resulting
Completable.
|
static @NonNull Completable |
concat(@NonNull Iterable<? extends CompletableSource> sources)
Returns a Completable which completes only when all sources complete, one after another.
|
static @NonNull Completable |
concat(@NonNull Publisher<? extends CompletableSource> sources)
Returns a Completable which completes only when all sources complete, one after another.
|
static @NonNull Completable |
concat(@NonNull Publisher<? extends CompletableSource> sources,
int prefetch)
Returns a Completable which completes only when all sources complete, one after another.
|
static @NonNull Completable |
concatArray(CompletableSource... sources)
Returns a Completable which completes only when all sources complete, one after another.
|
@NonNull Completable |
concatWith(@NonNull CompletableSource other)
Concatenates this Completable with another Completable.
|
static @NonNull Completable |
create(@NonNull CompletableOnSubscribe source)
Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
|
static @NonNull Completable |
defer(@NonNull Supplier<? extends CompletableSource> completableSupplier)
Defers the subscription to a Completable instance returned by a supplier.
|
@NonNull Completable |
delay(long delay,
@NonNull TimeUnit unit)
Returns a Completable which delays the emission of the completion event by the given time.
|
@NonNull Completable |
delay(long delay,
@NonNull TimeUnit unit,
@NonNull Scheduler scheduler)
Returns a Completable which delays the emission of the completion event by the given time while
running on the specified scheduler.
|
@NonNull Completable |
delay(long delay,
@NonNull TimeUnit unit,
@NonNull Scheduler scheduler,
boolean delayError)
Returns a Completable which delays the emission of the completion event, and optionally the error as well, by the given time while
running on the specified scheduler.
|
@NonNull Completable |
delaySubscription(long delay,
@NonNull TimeUnit unit)
Returns a Completable that delays the subscription to the source CompletableSource by a given amount of time.
|
@NonNull Completable |
delaySubscription(long delay,
@NonNull TimeUnit unit,
@NonNull Scheduler scheduler)
Returns a Completable that delays the subscription to the source CompletableSource by a given amount of time,
both waiting and subscribing on a given Scheduler.
|
@NonNull Completable |
doAfterTerminate(@NonNull Action onAfterTerminate)
Returns a Completable instance that calls the given onTerminate callback after this Completable
completes normally or with an exception.
|
@NonNull Completable |
doFinally(@NonNull Action onFinally)
Calls the specified action after this Completable signals onError or onComplete or gets disposed by
the downstream.
|
@NonNull Completable |
doOnComplete(@NonNull Action onComplete)
Returns a Completable which calls the given onComplete callback if this Completable completes.
|
@NonNull Completable |
doOnDispose(@NonNull Action onDispose)
Calls the shared
Action if a CompletableObserver subscribed to the current
Completable disposes the common Disposable it received via onSubscribe. |
@NonNull Completable |
doOnError(@NonNull Consumer<? super Throwable> onError)
Returns a Completable which calls the given onError callback if this Completable emits an error.
|
@NonNull Completable |
doOnEvent(@NonNull Consumer<? super Throwable> onEvent)
Returns a Completable which calls the given onEvent callback with the (throwable) for an onError
or (null) for an onComplete signal from this Completable before delivering said signal to the downstream.
|
@NonNull Completable |
doOnSubscribe(@NonNull Consumer<? super Disposable> onSubscribe)
Returns a Completable instance that calls the given onSubscribe callback with the disposable
that child subscribers receive on subscription.
|
@NonNull Completable |
doOnTerminate(@NonNull Action onTerminate)
Returns a Completable instance that calls the given onTerminate callback just before this Completable
completes normally or with an exception.
|
static @NonNull Completable |
error(@NonNull Supplier<? extends Throwable> errorSupplier)
Creates a Completable which calls the given error supplier for each subscriber
and emits its returned Throwable.
|
static @NonNull Completable |
error(@NonNull Throwable error)
Creates a Completable instance that emits the given Throwable exception to subscribers.
|
static @NonNull Completable |
fromAction(@NonNull Action run)
Returns a Completable instance that runs the given Action for each subscriber and
emits either an unchecked exception or simply completes.
|
static @NonNull Completable |
fromCallable(@NonNull Callable<?> callable)
Returns a Completable which when subscribed, executes the callable function, ignores its
normal result and emits onError or onComplete only.
|
static @NonNull Completable |
fromCompletionStage(@NonNull CompletionStage<?> stage)
Signals completion (or error) when the
CompletionStage terminates. |
static @NonNull Completable |
fromFuture(@NonNull Future<?> future)
Returns a Completable instance that reacts to the termination of the given Future in a blocking fashion.
|
static <T> @NonNull Completable |
fromMaybe(@NonNull MaybeSource<T> maybe)
Returns a Completable instance that when subscribed to, subscribes to the
Maybe instance and
emits a completion event if the maybe emits onSuccess /onComplete or forwards any
onError events. |
static <T> @NonNull Completable |
fromObservable(@NonNull ObservableSource<T> observable)
Returns a Completable instance that subscribes to the given Observable, ignores all values and
emits only the terminal event.
|
static <T> @NonNull Completable |
fromPublisher(@NonNull Publisher<T> publisher)
Returns a Completable instance that subscribes to the given publisher, ignores all values and
emits only the terminal event.
|
static @NonNull Completable |
fromRunnable(@NonNull Runnable run)
Returns a Completable instance that runs the given Runnable for each subscriber and
emits either its exception or simply completes.
|
static <T> @NonNull Completable |
fromSingle(@NonNull SingleSource<T> single)
Returns a Completable instance that when subscribed to, subscribes to the Single instance and
emits a completion event if the single emits onSuccess or forwards any onError events.
|
static @NonNull Completable |
fromSupplier(@NonNull Supplier<?> supplier)
Returns a Completable which when subscribed, executes the supplier function, ignores its
normal result and emits onError or onComplete only.
|
@NonNull Completable |
hide()
Hides the identity of this Completable and its Disposable.
|
@NonNull Completable |
lift(@NonNull CompletableOperator onLift)
This method requires advanced knowledge about building operators, please consider
other standard composition methods first;
Returns a
Completable which, when subscribed to, invokes the apply(CompletableObserver) method
of the provided CompletableOperator for each individual downstream Completable and allows the
insertion of a custom operator by accessing the downstream's CompletableObserver during this subscription phase
and providing a new CompletableObserver , containing the custom operator's intended business logic, that will be
used in the subscription process going further upstream. |
<T> @NonNull Single<Notification<T>> |
materialize()
Maps the signal types of this Completable into a
Notification of the same kind
and emits it as a single success value to downstream. |
static @NonNull Completable |
merge(@NonNull Iterable<? extends CompletableSource> sources)
Returns a Completable instance that subscribes to all sources at once and
completes only when all source Completables complete or one of them emits an error.
|
static @NonNull Completable |
merge(@NonNull Publisher<? extends CompletableSource> sources)
Returns a Completable instance that subscribes to all sources at once and
completes only when all source Completables complete or one of them emits an error.
|
static @NonNull Completable |
merge(@NonNull Publisher<? extends CompletableSource> sources,
int maxConcurrency)
Returns a Completable instance that keeps subscriptions to a limited number of sources at once and
completes only when all source Completables complete or one of them emits an error.
|
static @NonNull Completable |
mergeArray(CompletableSource... sources)
Returns a Completable instance that subscribes to all sources at once and
completes only when all source Completables complete or one of them emits an error.
|
static @NonNull Completable |
mergeArrayDelayError(CompletableSource... sources)
Returns a CompletableConsumable that subscribes to all Completables in the source array and delays
any error emitted by either the sources observable or any of the inner Completables until all of
them terminate in a way or another.
|
static @NonNull Completable |
mergeDelayError(@NonNull Iterable<? extends CompletableSource> sources)
Returns a Completable that subscribes to all Completables in the source sequence and delays
any error emitted by either the sources observable or any of the inner Completables until all of
them terminate in a way or another.
|
static @NonNull Completable |
mergeDelayError(@NonNull Publisher<? extends CompletableSource> sources)
Returns a Completable that subscribes to all Completables in the source sequence and delays
any error emitted by either the sources observable or any of the inner Completables until all of
them terminate in a way or another.
|
static @NonNull Completable |
mergeDelayError(@NonNull Publisher<? extends CompletableSource> sources,
int maxConcurrency)
Returns a Completable that subscribes to a limited number of inner Completables at once in
the source sequence and delays any error emitted by either the sources
observable or any of the inner Completables until all of
them terminate in a way or another.
|
@NonNull Completable |
mergeWith(@NonNull CompletableSource other)
Returns a Completable which subscribes to this and the other Completable and completes
when both of them complete or one emits an error.
|
static @NonNull Completable |
never()
Returns a Completable that never calls onError or onComplete.
|
@NonNull Completable |
observeOn(@NonNull Scheduler scheduler)
Returns a Completable which emits the terminal events from the thread of the specified scheduler.
|
@NonNull Completable |
onErrorComplete()
Returns a Completable instance that if this Completable emits an error, it will emit an onComplete
and swallow the throwable.
|
@NonNull Completable |
onErrorComplete(@NonNull Predicate<? super Throwable> predicate)
Returns a Completable instance that if this Completable emits an error and the predicate returns
true, it will emit an onComplete and swallow the throwable.
|
@NonNull Completable |
onErrorResumeNext(@NonNull Function<? super Throwable,? extends CompletableSource> errorMapper)
Returns a Completable instance that when encounters an error from this Completable, calls the
specified mapper function that returns another Completable instance for it and resumes the
execution with it.
|
@NonNull Completable |
onTerminateDetach()
Nulls out references to the upstream producer and downstream CompletableObserver if
the sequence is terminated or downstream calls dispose().
|
@NonNull Completable |
repeat()
Returns a Completable that repeatedly subscribes to this Completable until disposed.
|
@NonNull Completable |
repeat(long times)
Returns a Completable that subscribes repeatedly at most the given times to this Completable.
|
@NonNull Completable |
repeatUntil(@NonNull BooleanSupplier stop)
Returns a Completable that repeatedly subscribes to this Completable so long as the given
stop supplier returns false.
|
@NonNull Completable |
repeatWhen(@NonNull Function<? super Flowable<Object>,? extends Publisher<?>> handler)
Returns a Completable instance that repeats when the Publisher returned by the handler
emits an item or completes when this Publisher emits a completed event.
|
@NonNull Completable |
retry()
Returns a Completable that retries this Completable as long as it emits an onError event.
|
@NonNull Completable |
retry(@NonNull BiPredicate<? super Integer,? super Throwable> predicate)
Returns a Completable that retries this Completable in case of an error as long as the predicate
returns true.
|
@NonNull Completable |
retry(long times)
Returns a Completable that when this Completable emits an error, retries at most the given
number of times before giving up and emitting the last error.
|
@NonNull Completable |
retry(long times,
@NonNull Predicate<? super Throwable> predicate)
Returns a Completable that when this Completable emits an error, retries at most times
or until the predicate returns false, whichever happens first and emitting the last error.
|
@NonNull Completable |
retry(@NonNull Predicate<? super Throwable> predicate)
Returns a Completable that when this Completable emits an error, calls the given predicate with
the latest exception to decide whether to resubscribe to this or not.
|
@NonNull Completable |
retryWhen(@NonNull Function<? super Flowable<Throwable>,? extends Publisher<?>> handler)
Returns a Completable which given a Publisher and when this Completable emits an error, delivers
that error through a Flowable and the Publisher should signal a value indicating a retry in response
or a terminal event indicating a termination.
|
@NonNull Completable |
startWith(@NonNull CompletableSource other)
Returns a Completable which first runs the other Completable
then this completable if the other completed normally.
|
<T> @NonNull Observable<T> |
startWith(@NonNull ObservableSource<T> other)
Returns an Observable which first delivers the events
of the other Observable then runs this CompletableConsumable.
|
<T> @NonNull Flowable<T> |
startWith(@NonNull Publisher<T> other)
Returns a Flowable which first delivers the events
of the other Publisher then runs this Completable.
|
@NonNull Disposable |
subscribe()
Subscribes to this CompletableConsumable and returns a Disposable which can be used to dispose
the subscription.
|
@NonNull Disposable |
subscribe(@NonNull Action onComplete)
Subscribes to this Completable and calls the given Action when this Completable
completes normally.
|
@NonNull Disposable |
subscribe(@NonNull Action onComplete,
@NonNull Consumer<? super Throwable> onError)
Subscribes to this Completable and calls back either the onError or onComplete functions.
|
void |
subscribe(@NonNull CompletableObserver observer)
Subscribes the given
CompletableObserver to this CompletableSource instance. |
protected abstract void |
subscribeActual(@NonNull CompletableObserver observer)
Implement this method to handle the incoming
CompletableObserver s and
perform the business logic in your operator. |
@NonNull Completable |
subscribeOn(@NonNull Scheduler scheduler)
Returns a Completable which subscribes the child subscriber on the specified scheduler, making
sure the subscription side-effects happen on that specific thread of the scheduler.
|
<E extends CompletableObserver> |
subscribeWith(E observer)
Subscribes a given CompletableObserver (subclass) to this Completable and returns the given
CompletableObserver as is.
|
@NonNull Completable |
takeUntil(@NonNull CompletableSource other)
Terminates the downstream if this or the other
Completable
terminates (wins the termination race) while disposing the connection to the losing source. |
@NonNull TestObserver<Void> |
test()
Creates a TestObserver and subscribes
it to this Completable.
|
@NonNull TestObserver<Void> |
test(boolean dispose)
Creates a TestObserver optionally in cancelled state, then subscribes it to this Completable.
|
@NonNull Completable |
timeout(long timeout,
@NonNull TimeUnit unit)
Returns a Completable that runs this Completable and emits a TimeoutException in case
this Completable doesn't complete within the given time.
|
@NonNull Completable |
timeout(long timeout,
@NonNull TimeUnit unit,
@NonNull CompletableSource other)
Returns a Completable that runs this Completable and switches to the other Completable
in case this Completable doesn't complete within the given time.
|
@NonNull Completable |
timeout(long timeout,
@NonNull TimeUnit unit,
@NonNull Scheduler scheduler)
Returns a Completable that runs this Completable and emits a TimeoutException in case
this Completable doesn't complete within the given time while "waiting" on the specified
Scheduler.
|
@NonNull Completable |
timeout(long timeout,
@NonNull TimeUnit unit,
@NonNull Scheduler scheduler,
@NonNull CompletableSource other)
Returns a Completable that runs this Completable and switches to the other Completable
in case this Completable doesn't complete within the given time while "waiting" on
the specified scheduler.
|
static @NonNull Completable |
timer(long delay,
@NonNull TimeUnit unit)
Returns a Completable instance that fires its onComplete event after the given delay elapsed.
|
static @NonNull Completable |
timer(long delay,
@NonNull TimeUnit unit,
@NonNull Scheduler scheduler)
Returns a Completable instance that fires its onComplete event after the given delay elapsed
by using the supplied scheduler.
|
<R> R |
to(@NonNull CompletableConverter<? extends R> converter)
Calls the specified converter function during assembly time and returns its resulting value.
|
<T> @NonNull CompletionStage<T> |
toCompletionStage(T defaultItem)
Signals the given default item when the upstream completes or signals the upstream error via
a
CompletionStage . |
<T> @NonNull Flowable<T> |
toFlowable()
Returns a Flowable which when subscribed to subscribes to this Completable and
relays the terminal events to the subscriber.
|
<T> @NonNull Maybe<T> |
toMaybe()
Converts this Completable into a
Maybe . |
<T> @NonNull Observable<T> |
toObservable()
Returns an Observable which when subscribed to subscribes to this Completable and
relays the terminal events to the subscriber.
|
<T> @NonNull Single<T> |
toSingle(@NonNull Supplier<? extends T> completionValueSupplier)
Converts this Completable into a Single which when this Completable completes normally,
calls the given supplier and emits its returned value through onSuccess.
|
<T> @NonNull Single<T> |
toSingleDefault(T completionValue)
Converts this Completable into a Single which when this Completable completes normally,
emits the given value through onSuccess.
|
static @NonNull Completable |
unsafeCreate(@NonNull CompletableSource source)
Constructs a Completable instance by wrapping the given source callback
without any safeguards; you should manage the lifecycle and response
to downstream disposal.
|
@NonNull Completable |
unsubscribeOn(@NonNull Scheduler scheduler)
Returns a Completable which makes sure when a subscriber disposes the subscription, the
dispose is called on the specified scheduler.
|
static <R> @NonNull Completable |
using(@NonNull Supplier<R> resourceSupplier,
@NonNull Function<? super R,? extends CompletableSource> completableFunction,
@NonNull Consumer<? super R> disposer)
Returns a Completable instance which manages a resource along
with a custom Completable instance while the subscription is active.
|
static <R> @NonNull Completable |
using(@NonNull Supplier<R> resourceSupplier,
@NonNull Function<? super R,? extends CompletableSource> completableFunction,
@NonNull Consumer<? super R> disposer,
boolean eager)
Returns a Completable instance which manages a resource along
with a custom Completable instance while the subscription is active and performs eager or lazy
resource disposition.
|
static @NonNull Completable |
wrap(@NonNull CompletableSource source)
Wraps the given CompletableSource into a Completable
if not already Completable.
|
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @SafeVarargs public static @NonNull Completable ambArray(@NonNull CompletableSource... sources)
ambArray
does not operate by default on a particular Scheduler
.sources
- the array of source Completables. A subscription to each source will
occur in the same order as in this array.NullPointerException
- if sources is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable amb(@NonNull @NonNull Iterable<? extends CompletableSource> sources)
amb
does not operate by default on a particular Scheduler
.sources
- the array of source Completables. A subscription to each source will
occur in the same order as in this Iterable.NullPointerException
- if sources is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable complete()
complete
does not operate by default on a particular Scheduler
.@CheckReturnValue @NonNull @SchedulerSupport(value="none") @SafeVarargs public static @NonNull Completable concatArray(@NonNull CompletableSource... sources)
concatArray
does not operate by default on a particular Scheduler
.sources
- the sources to concatenateNullPointerException
- if sources is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable concat(@NonNull @NonNull Iterable<? extends CompletableSource> sources)
concat
does not operate by default on a particular Scheduler
.sources
- the sources to concatenateNullPointerException
- if sources is null@CheckReturnValue @SchedulerSupport(value="none") @BackpressureSupport(value=FULL) @NonNull public static @NonNull Completable concat(@NonNull @NonNull Publisher<? extends CompletableSource> sources)
Completable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.concat
does not operate by default on a particular Scheduler
.sources
- the sources to concatenateNullPointerException
- if sources is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=FULL) public static @NonNull Completable concat(@NonNull @NonNull Publisher<? extends CompletableSource> sources, int prefetch)
Completable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.concat
does not operate by default on a particular Scheduler
.sources
- the sources to concatenateprefetch
- the number of sources to prefetch from the sourcesNullPointerException
- if sources is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable create(@NonNull @NonNull CompletableOnSubscribe source)
Example:
Completable.create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onComplete();
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
});
Whenever a CompletableObserver
subscribes to the returned Completable
, the provided
CompletableOnSubscribe
callback is invoked with a fresh instance of a CompletableEmitter
that will interact only with that specific CompletableObserver
. If this CompletableObserver
disposes the flow (making CompletableEmitter.isDisposed()
return true),
other observers subscribed to the same returned Completable
are not affected.
create
does not operate by default on a particular Scheduler
.source
- the emitter that is called when a CompletableObserver subscribes to the returned Completable
CompletableOnSubscribe
,
Cancellable
@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable unsafeCreate(@NonNull @NonNull CompletableSource source)
unsafeCreate
does not operate by default on a particular Scheduler
.source
- the callback which will receive the CompletableObserver instances
when the Completable is subscribed to.NullPointerException
- if source is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable defer(@NonNull @NonNull Supplier<? extends CompletableSource> completableSupplier)
defer
does not operate by default on a particular Scheduler
.completableSupplier
- the supplier that returns the Completable that will be subscribed to.@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable error(@NonNull @NonNull Supplier<? extends Throwable> errorSupplier)
If the errorSupplier returns null, the child CompletableObservers will receive a NullPointerException.
error
does not operate by default on a particular Scheduler
.errorSupplier
- the error supplier, not nullNullPointerException
- if errorSupplier is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable error(@NonNull @NonNull Throwable error)
error
does not operate by default on a particular Scheduler
.error
- the Throwable instance to emit, not nullNullPointerException
- if error is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable fromAction(@NonNull @NonNull Action run)
fromAction
does not operate by default on a particular Scheduler
.Action
throws an exception, the respective Throwable
is
delivered to the downstream via CompletableObserver.onError(Throwable)
,
except when the downstream has disposed this Completable
source.
In this latter case, the Throwable
is delivered to the global error handler via
RxJavaPlugins.onError(Throwable)
as an UndeliverableException
.
run
- the runnable to run for each subscriberNullPointerException
- if run is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable fromCallable(@NonNull @NonNull Callable<?> callable)
fromCallable
does not operate by default on a particular Scheduler
.Callable
throws an exception, the respective Throwable
is
delivered to the downstream via CompletableObserver.onError(Throwable)
,
except when the downstream has disposed this Completable
source.
In this latter case, the Throwable
is delivered to the global error handler via
RxJavaPlugins.onError(Throwable)
as an UndeliverableException
.
callable
- the callable instance to execute for each subscriberdefer(Supplier)
,
fromSupplier(Supplier)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable fromFuture(@NonNull @NonNull Future<?> future)
Note that if any of the observers to this Completable call dispose, this Completable will cancel the future.
fromFuture
does not operate by default on a particular Scheduler
.future
- the future to react to@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static <T> @NonNull Completable fromMaybe(@NonNull @NonNull MaybeSource<T> maybe)
Maybe
instance and
emits a completion event if the maybe emits onSuccess
/onComplete
or forwards any
onError
events.
fromMaybe
does not operate by default on a particular Scheduler
.History: 2.1.17 - beta
T
- the value type of the MaybeSource
elementmaybe
- the Maybe instance to subscribe to, not nullNullPointerException
- if single is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable fromRunnable(@NonNull @NonNull Runnable run)
fromRunnable
does not operate by default on a particular Scheduler
.Runnable
throws an exception, the respective Throwable
is
delivered to the downstream via CompletableObserver.onError(Throwable)
,
except when the downstream has disposed this Completable
source.
In this latter case, the Throwable
is delivered to the global error handler via
RxJavaPlugins.onError(Throwable)
as an UndeliverableException
.
run
- the runnable to run for each subscriberNullPointerException
- if run is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static <T> @NonNull Completable fromObservable(@NonNull @NonNull ObservableSource<T> observable)
fromObservable
does not operate by default on a particular Scheduler
.T
- the type of the Observableobservable
- the Observable instance to subscribe to, not nullNullPointerException
- if flowable is null@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public static <T> @NonNull Completable fromPublisher(@NonNull @NonNull Publisher<T> publisher)
The Publisher
must follow the
Reactive-Streams specification.
Violating the specification may result in undefined behavior.
If possible, use create(CompletableOnSubscribe)
to create a
source-like Completable
instead.
Note that even though Publisher
appears to be a functional interface, it
is not recommended to implement it through a lambda as the specification requires
state management that is not achievable with a stateless lambda.
Completable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.fromPublisher
does not operate by default on a particular Scheduler
.T
- the type of the publisherpublisher
- the Publisher instance to subscribe to, not nullNullPointerException
- if publisher is nullcreate(CompletableOnSubscribe)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static <T> @NonNull Completable fromSingle(@NonNull @NonNull SingleSource<T> single)
fromSingle
does not operate by default on a particular Scheduler
.T
- the value type of the Singlesingle
- the Single instance to subscribe to, not nullNullPointerException
- if single is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable fromSupplier(@NonNull @NonNull Supplier<?> supplier)
fromSupplier
does not operate by default on a particular Scheduler
.Supplier
throws an exception, the respective Throwable
is
delivered to the downstream via CompletableObserver.onError(Throwable)
,
except when the downstream has disposed this Completable
source.
In this latter case, the Throwable
is delivered to the global error handler via
RxJavaPlugins.onError(Throwable)
as an UndeliverableException
.
supplier
- the Supplier instance to execute for each subscriberdefer(Supplier)
,
fromCallable(Callable)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @SafeVarargs public static @NonNull Completable mergeArray(@NonNull CompletableSource... sources)
mergeArray
does not operate by default on a particular Scheduler
.CompletableSource
s signal a Throwable
via onError
, the resulting
Completable
terminates with that Throwable
and all other source CompletableSource
s are disposed.
If more than one CompletableSource
signals an error, the resulting Completable
may terminate with the
first one's error or, depending on the concurrency of the sources, may terminate with a
CompositeException
containing two or more of the various error signals.
Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via
RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s
signaled by source(s) after the returned Completable
has been disposed or terminated with a
(composite) error will be sent to the same global error handler.
Use mergeArrayDelayError(CompletableSource...)
to merge sources and terminate only when all source CompletableSource
s
have completed or failed with an error.
sources
- the iterable sequence of sources.NullPointerException
- if sources is nullmergeArrayDelayError(CompletableSource...)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable merge(@NonNull @NonNull Iterable<? extends CompletableSource> sources)
merge
does not operate by default on a particular Scheduler
.CompletableSource
s signal a Throwable
via onError
, the resulting
Completable
terminates with that Throwable
and all other source CompletableSource
s are disposed.
If more than one CompletableSource
signals an error, the resulting Completable
may terminate with the
first one's error or, depending on the concurrency of the sources, may terminate with a
CompositeException
containing two or more of the various error signals.
Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via
RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s
signaled by source(s) after the returned Completable
has been disposed or terminated with a
(composite) error will be sent to the same global error handler.
Use mergeDelayError(Iterable)
to merge sources and terminate only when all source CompletableSource
s
have completed or failed with an error.
sources
- the iterable sequence of sources.NullPointerException
- if sources is nullmergeDelayError(Iterable)
@CheckReturnValue @SchedulerSupport(value="none") @BackpressureSupport(value=UNBOUNDED_IN) @NonNull public static @NonNull Completable merge(@NonNull @NonNull Publisher<? extends CompletableSource> sources)
Completable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.merge
does not operate by default on a particular Scheduler
.CompletableSource
s signal a Throwable
via onError
, the resulting
Completable
terminates with that Throwable
and all other source CompletableSource
s are disposed.
If more than one CompletableSource
signals an error, the resulting Completable
may terminate with the
first one's error or, depending on the concurrency of the sources, may terminate with a
CompositeException
containing two or more of the various error signals.
Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via
RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s
signaled by source(s) after the returned Completable
has been disposed or terminated with a
(composite) error will be sent to the same global error handler.
Use mergeDelayError(Publisher)
to merge sources and terminate only when all source CompletableSource
s
have completed or failed with an error.
sources
- the iterable sequence of sources.NullPointerException
- if sources is nullmergeDelayError(Publisher)
@CheckReturnValue @SchedulerSupport(value="none") @BackpressureSupport(value=FULL) @NonNull public static @NonNull Completable merge(@NonNull @NonNull Publisher<? extends CompletableSource> sources, int maxConcurrency)
Completable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.merge
does not operate by default on a particular Scheduler
.CompletableSource
s signal a Throwable
via onError
, the resulting
Completable
terminates with that Throwable
and all other source CompletableSource
s are disposed.
If more than one CompletableSource
signals an error, the resulting Completable
may terminate with the
first one's error or, depending on the concurrency of the sources, may terminate with a
CompositeException
containing two or more of the various error signals.
Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via
RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s
signaled by source(s) after the returned Completable
has been disposed or terminated with a
(composite) error will be sent to the same global error handler.
Use mergeDelayError(Publisher, int)
to merge sources and terminate only when all source CompletableSource
s
have completed or failed with an error.
sources
- the iterable sequence of sources.maxConcurrency
- the maximum number of concurrent subscriptionsNullPointerException
- if sources is nullIllegalArgumentException
- if maxConcurrency is less than 1mergeDelayError(Publisher, int)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @SafeVarargs public static @NonNull Completable mergeArrayDelayError(@NonNull CompletableSource... sources)
mergeArrayDelayError
does not operate by default on a particular Scheduler
.sources
- the array of CompletablesNullPointerException
- if sources is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable mergeDelayError(@NonNull @NonNull Iterable<? extends CompletableSource> sources)
mergeDelayError
does not operate by default on a particular Scheduler
.sources
- the sequence of CompletablesNullPointerException
- if sources is null@CheckReturnValue @SchedulerSupport(value="none") @BackpressureSupport(value=UNBOUNDED_IN) @NonNull public static @NonNull Completable mergeDelayError(@NonNull @NonNull Publisher<? extends CompletableSource> sources)
Completable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.mergeDelayError
does not operate by default on a particular Scheduler
.sources
- the sequence of CompletablesNullPointerException
- if sources is null@CheckReturnValue @SchedulerSupport(value="none") @BackpressureSupport(value=FULL) @NonNull public static @NonNull Completable mergeDelayError(@NonNull @NonNull Publisher<? extends CompletableSource> sources, int maxConcurrency)
Completable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.mergeDelayError
does not operate by default on a particular Scheduler
.sources
- the sequence of CompletablesmaxConcurrency
- the maximum number of concurrent subscriptions to CompletablesNullPointerException
- if sources is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public static @NonNull Completable never()
never
does not operate by default on a particular Scheduler
.@CheckReturnValue @SchedulerSupport(value="io.reactivex:computation") @NonNull public static @NonNull Completable timer(long delay, @NonNull @NonNull TimeUnit unit)
timer
does operate by default on the computation
Scheduler
.delay
- the delay timeunit
- the delay unit@CheckReturnValue @NonNull @SchedulerSupport(value="custom") public static @NonNull Completable timer(long delay, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler)
timer
operates on the Scheduler
you specify.delay
- the delay timeunit
- the delay unitscheduler
- the scheduler where to emit the complete event@CheckReturnValue @SchedulerSupport(value="none") @NonNull public static <R> @NonNull Completable using(@NonNull @NonNull Supplier<R> resourceSupplier, @NonNull @NonNull Function<? super R,? extends CompletableSource> completableFunction, @NonNull @NonNull Consumer<? super R> disposer)
This overload disposes eagerly before the terminal event is emitted.
using
does not operate by default on a particular Scheduler
.R
- the resource typeresourceSupplier
- the supplier that returns a resource to be managed.completableFunction
- the function that given a resource returns a Completable instance that will be subscribed todisposer
- the consumer that disposes the resource created by the resource supplier@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static <R> @NonNull Completable using(@NonNull @NonNull Supplier<R> resourceSupplier, @NonNull @NonNull Function<? super R,? extends CompletableSource> completableFunction, @NonNull @NonNull Consumer<? super R> disposer, boolean eager)
If this overload performs a lazy disposal after the terminal event is emitted. Exceptions thrown at this time will be delivered to RxJavaPlugins only.
using
does not operate by default on a particular Scheduler
.R
- the resource typeresourceSupplier
- the supplier that returns a resource to be managedcompletableFunction
- the function that given a resource returns a non-null
Completable instance that will be subscribed todisposer
- the consumer that disposes the resource created by the resource suppliereager
- If true
then resource disposal will happen either on a dispose()
call before the upstream is disposed
or just before the emission of a terminal event (onComplete
or onError
).
If false
the resource disposal will happen either on a dispose()
call after the upstream is disposed
or just after the emission of a terminal event (onComplete
or onError
).@CheckReturnValue @NonNull @SchedulerSupport(value="none") public static @NonNull Completable wrap(@NonNull @NonNull CompletableSource source)
wrap
does not operate by default on a particular Scheduler
.source
- the source to wrapNullPointerException
- if source is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable ambWith(@NonNull @NonNull CompletableSource other)
ambWith
does not operate by default on a particular Scheduler
.other
- the other Completable, not null. A subscription to this provided source will occur after subscribing
to the current source.NullPointerException
- if other is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final <T> @NonNull Observable<T> andThen(@NonNull @NonNull ObservableSource<T> next)
next
ObservableSource. An error event from this Completable will be
propagated to the downstream subscriber and will result in skipping the subscription of the
Observable.
andThen
does not operate by default on a particular Scheduler
.T
- the value type of the next ObservableSourcenext
- the Observable to subscribe after this Completable is completed, not nullNullPointerException
- if next is null@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <T> @NonNull Flowable<T> andThen(@NonNull @NonNull Publisher<T> next)
next
Flowable. An error event from this Completable will be
propagated to the downstream subscriber and will result in skipping the subscription of the
Publisher.
Flowable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.andThen
does not operate by default on a particular Scheduler
.T
- the value type of the next Publishernext
- the Publisher to subscribe after this Completable is completed, not nullNullPointerException
- if next is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final <T> @NonNull Single<T> andThen(@NonNull @NonNull SingleSource<T> next)
next
SingleSource. An error event from this Completable will be
propagated to the downstream subscriber and will result in skipping the subscription of the
Single.
andThen
does not operate by default on a particular Scheduler
.T
- the value type of the next SingleSourcenext
- the Single to subscribe after this Completable is completed, not null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final <T> @NonNull Maybe<T> andThen(@NonNull @NonNull MaybeSource<T> next)
Maybe
which will subscribe to this Completable and once that is completed then
will subscribe to the next
MaybeSource. An error event from this Completable will be
propagated to the downstream subscriber and will result in skipping the subscription of the
Maybe.
andThen
does not operate by default on a particular Scheduler
.T
- the value type of the next MaybeSourcenext
- the Maybe to subscribe after this Completable is completed, not null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable andThen(@NonNull @NonNull CompletableSource next)
This is an alias for concatWith(CompletableSource)
.
andThen
does not operate by default on a particular Scheduler
.next
- the other Completable, not nullNullPointerException
- if other is null@SchedulerSupport(value="none") public final void blockingAwait()
blockingAwait
does not operate by default on a particular Scheduler
.Exception
into RuntimeException
and throws that. Otherwise, RuntimeException
s and
Error
s are rethrown as they are.RuntimeException
- wrapping an InterruptedException if the current thread is interrupted@CheckReturnValue @SchedulerSupport(value="none") public final boolean blockingAwait(long timeout, @NonNull @NonNull TimeUnit unit)
blockingAwait
does not operate by default on a particular Scheduler
.Exception
into RuntimeException
and throws that. Otherwise, RuntimeException
s and
Error
s are rethrown as they are.timeout
- the timeout valueunit
- the timeout unitRuntimeException
- wrapping an InterruptedException if the current thread is interrupted@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable cache()
Note that this operator doesn't allow disposing the connection of the upstream source.
cache
does not operate by default on a particular Scheduler
.History: 2.0.4 - experimental
@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable compose(@NonNull @NonNull CompletableTransformer transformer)
compose
does not operate by default on a particular Scheduler
.transformer
- the transformer function, not nullNullPointerException
- if transformer is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable concatWith(@NonNull @NonNull CompletableSource other)
concatWith
does not operate by default on a particular Scheduler
.other
- the other Completable, not nullNullPointerException
- if other is nullandThen(MaybeSource)
,
andThen(ObservableSource)
,
andThen(SingleSource)
,
andThen(Publisher)
@CheckReturnValue @SchedulerSupport(value="io.reactivex:computation") @NonNull public final @NonNull Completable delay(long delay, @NonNull @NonNull TimeUnit unit)
delay
does operate by default on the computation
Scheduler
.delay
- the delay timeunit
- the delay unitNullPointerException
- if unit is null@CheckReturnValue @SchedulerSupport(value="custom") @NonNull public final @NonNull Completable delay(long delay, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler)
delay
operates on the Scheduler
you specify.delay
- the delay timeunit
- the delay unitscheduler
- the scheduler to run the delayed completion onNullPointerException
- if unit or scheduler is null@CheckReturnValue @NonNull @SchedulerSupport(value="custom") public final @NonNull Completable delay(long delay, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler, boolean delayError)
delay
operates on the Scheduler
you specify.delay
- the delay timeunit
- the delay unitscheduler
- the scheduler to run the delayed completion ondelayError
- delay the error emission as well?NullPointerException
- if unit or scheduler is null@CheckReturnValue @SchedulerSupport(value="io.reactivex:computation") @NonNull public final @NonNull Completable delaySubscription(long delay, @NonNull @NonNull TimeUnit unit)
delaySubscription
operates by default on the computation
Scheduler
.History: 2.2.3 - experimental
delay
- the time to delay the subscriptionunit
- the time unit of delay
@CheckReturnValue @SchedulerSupport(value="custom") @NonNull public final @NonNull Completable delaySubscription(long delay, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler)
Scheduler
this operator will use.History: 2.2.3 - experimental
delay
- the time to delay the subscriptionunit
- the time unit of delay
scheduler
- the Scheduler on which the waiting and subscription will happen@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable doOnComplete(@NonNull @NonNull Action onComplete)
doOnComplete
does not operate by default on a particular Scheduler
.onComplete
- the callback to call when this emits an onComplete eventNullPointerException
- if onComplete is nulldoFinally(Action)
@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable doOnDispose(@NonNull @NonNull Action onDispose)
Action
if a CompletableObserver subscribed to the current
Completable disposes the common Disposable it received via onSubscribe.
doOnDispose
does not operate by default on a particular Scheduler
.onDispose
- the action to call when the child subscriber disposes the subscriptionNullPointerException
- if onDispose is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable doOnError(@NonNull @NonNull Consumer<? super Throwable> onError)
doOnError
does not operate by default on a particular Scheduler
.onError
- the error callbackNullPointerException
- if onError is nulldoFinally(Action)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable doOnEvent(@NonNull @NonNull Consumer<? super Throwable> onEvent)
doOnEvent
does not operate by default on a particular Scheduler
.onEvent
- the event callbackNullPointerException
- if onEvent is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable doOnSubscribe(@NonNull @NonNull Consumer<? super Disposable> onSubscribe)
doOnSubscribe
does not operate by default on a particular Scheduler
.onSubscribe
- the callback called when a child subscriber subscribesNullPointerException
- if onSubscribe is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable doOnTerminate(@NonNull @NonNull Action onTerminate)
doOnTerminate
does not operate by default on a particular Scheduler
.onTerminate
- the callback to call just before this Completable terminatesdoFinally(Action)
@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable doAfterTerminate(@NonNull @NonNull Action onAfterTerminate)
doAfterTerminate
does not operate by default on a particular Scheduler
.onAfterTerminate
- the callback to call after this Completable terminatesdoFinally(Action)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable doFinally(@NonNull @NonNull Action onFinally)
In case of a race between a terminal event and a dispose call, the provided onFinally
action
is executed once per subscription.
Note that the onFinally
action is shared between subscriptions and as such
should be thread-safe.
doFinally
does not operate by default on a particular Scheduler
.History: 2.0.1 - experimental
onFinally
- the action called when this Completable terminates or gets disposed@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable lift(@NonNull @NonNull CompletableOperator onLift)
Completable
which, when subscribed to, invokes the apply(CompletableObserver)
method
of the provided CompletableOperator
for each individual downstream Completable
and allows the
insertion of a custom operator by accessing the downstream's CompletableObserver
during this subscription phase
and providing a new CompletableObserver
, containing the custom operator's intended business logic, that will be
used in the subscription process going further upstream.
Generally, such a new CompletableObserver
will wrap the downstream's CompletableObserver
and forwards the
onError
and onComplete
events from the upstream directly or according to the
emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
flow control calls of dispose
and isDisposed
that would have traveled upstream and perform
additional actions depending on the same business logic requirements.
Example:
// Step 1: Create the consumer type that will be returned by the CompletableOperator.apply():
public final class CustomCompletableObserver implements CompletableObserver, Disposable {
// The downstream's CompletableObserver that will receive the onXXX events
final CompletableObserver downstream;
// The connection to the upstream source that will call this class' onXXX methods
Disposable upstream;
// The constructor takes the downstream subscriber and usually any other parameters
public CustomCompletableObserver(CompletableObserver downstream) {
this.downstream = downstream;
}
// In the subscription phase, the upstream sends a Disposable to this class
// and subsequently this class has to send a Disposable to the downstream.
// Note that relaying the upstream's Disposable directly is not allowed in RxJava
@Override
public void onSubscribe(Disposable d) {
if (upstream != null) {
d.dispose();
} else {
upstream = d;
downstream.onSubscribe(this);
}
}
// Some operators may handle the upstream's error while others
// could just forward it to the downstream.
@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}
// When the upstream completes, usually the downstream should complete as well.
// In completable, this could also mean doing some side-effects
@Override
public void onComplete() {
System.out.println("Sequence completed");
downstream.onComplete();
}
// Some operators may use their own resources which should be cleaned up if
// the downstream disposes the flow before it completed. Operators without
// resources can simply forward the dispose to the upstream.
// In some cases, a disposed flag may be set by this method so that other parts
// of this class may detect the dispose and stop sending events
// to the downstream.
@Override
public void dispose() {
upstream.dispose();
}
// Some operators may simply forward the call to the upstream while others
// can return the disposed flag set in dispose().
@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
// Step 2: Create a class that implements the CompletableOperator interface and
// returns the custom consumer type from above in its apply() method.
// Such class may define additional parameters to be submitted to
// the custom consumer type.
final class CustomCompletableOperator implements CompletableOperator {
@Override
public CompletableObserver apply(CompletableObserver upstream) {
return new CustomCompletableObserver(upstream);
}
}
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
// or reusing an existing one.
Completable.complete()
.lift(new CustomCompletableOperator())
.test()
.assertResult();
Creating custom operators can be complicated and it is recommended one consults the RxJava wiki: Writing operators page about the tools, requirements, rules, considerations and pitfalls of implementing them.
Note that implementing custom operators via this lift()
method adds slightly more overhead by requiring
an additional allocation and indirection per assembled flows. Instead, extending the abstract Completable
class and creating a CompletableTransformer
with it is recommended.
Note also that it is not possible to stop the subscription phase in lift()
as the apply()
method
requires a non-null CompletableObserver
instance to be returned, which is then unconditionally subscribed to
the upstream Completable
. For example, if the operator decided there is no reason to subscribe to the
upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
return a CompletableObserver
that should immediately dispose the upstream's Disposable
in its
onSubscribe
method. Again, using a CompletableTransformer
and extending the Completable
is
a better option as subscribeActual(io.reactivex.rxjava3.core.CompletableObserver)
can decide to not subscribe to its upstream after all.
lift
does not operate by default on a particular Scheduler
, however, the
CompletableOperator
may use a Scheduler
to support its own asynchronous behavior.onLift
- the CompletableOperator
that receives the downstream's CompletableObserver
and should return
a CompletableObserver
with custom behavior to be used as the consumer for the current
Completable
.compose(CompletableTransformer)
@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final <T> @NonNull Single<Notification<T>> materialize()
Notification
of the same kind
and emits it as a single success value to downstream.
materialize
does not operate by default on a particular Scheduler
.History: 2.2.4 - experimental
T
- the intended target element type of the notificationSingle.dematerialize(Function)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable mergeWith(@NonNull @NonNull CompletableSource other)
mergeWith
does not operate by default on a particular Scheduler
.other
- the other Completable instanceNullPointerException
- if other is null@CheckReturnValue @NonNull @SchedulerSupport(value="custom") public final @NonNull Completable observeOn(@NonNull @NonNull Scheduler scheduler)
observeOn
operates on a Scheduler
you specify.scheduler
- the scheduler to emit terminal events onNullPointerException
- if scheduler is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable onErrorComplete()
onErrorComplete
does not operate by default on a particular Scheduler
.@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable onErrorComplete(@NonNull @NonNull Predicate<? super Throwable> predicate)
onErrorComplete
does not operate by default on a particular Scheduler
.predicate
- the predicate to call when an Throwable is emitted which should return true
if the Throwable should be swallowed and replaced with an onComplete.@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable onErrorResumeNext(@NonNull @NonNull Function<? super Throwable,? extends CompletableSource> errorMapper)
onErrorResumeNext
does not operate by default on a particular Scheduler
.errorMapper
- the mapper function that takes the error and should return a Completable as
continuation.@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable onTerminateDetach()
onTerminateDetach
does not operate by default on a particular Scheduler
.History: 2.1.5 - experimental
@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable repeat()
repeat
does not operate by default on a particular Scheduler
.@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable repeat(long times)
repeat
does not operate by default on a particular Scheduler
.times
- the number of times the resubscription should happenIllegalArgumentException
- if times is less than zero@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable repeatUntil(@NonNull @NonNull BooleanSupplier stop)
repeatUntil
does not operate by default on a particular Scheduler
.stop
- the supplier that should return true to stop resubscribing.NullPointerException
- if stop is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable repeatWhen(@NonNull @NonNull Function<? super Flowable<Object>,? extends Publisher<?>> handler)
repeatWhen
does not operate by default on a particular Scheduler
.handler
- the function that transforms the stream of values indicating the completion of
this Completable and returns a Publisher that emits items for repeating or completes to indicate the
repetition should stopNullPointerException
- if stop is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable retry()
retry
does not operate by default on a particular Scheduler
.@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable retry(@NonNull @NonNull BiPredicate<? super Integer,? super Throwable> predicate)
retry
does not operate by default on a particular Scheduler
.predicate
- the predicate called when this emits an error with the repeat count and the latest exception
and should return true to retry.@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable retry(long times)
retry
does not operate by default on a particular Scheduler
.times
- the number of times to resubscribe if the current Completable failsIllegalArgumentException
- if times is negative@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable retry(long times, @NonNull @NonNull Predicate<? super Throwable> predicate)
retry
does not operate by default on a particular Scheduler
.History: 2.1.8 - experimental
times
- the number of times to resubscribe if the current Completable failspredicate
- the predicate that is called with the latest throwable and should return
true to indicate the returned Completable should resubscribe to this Completable.NullPointerException
- if predicate is nullIllegalArgumentException
- if times is negative@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable retry(@NonNull @NonNull Predicate<? super Throwable> predicate)
retry
does not operate by default on a particular Scheduler
.predicate
- the predicate that is called with the latest throwable and should return
true to indicate the returned Completable should resubscribe to this Completable.NullPointerException
- if predicate is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable retryWhen(@NonNull @NonNull Function<? super Flowable<Throwable>,? extends Publisher<?>> handler)
Note that the inner Publisher
returned by the handler function should signal
either onNext
, onError
or onComplete
in response to the received
Throwable
to indicate the operator should retry or terminate. If the upstream to
the operator is asynchronous, signalling onNext followed by onComplete immediately may
result in the sequence to be completed immediately. Similarly, if this inner
Publisher
signals onError
or onComplete
while the upstream is
active, the sequence is terminated with the same signal immediately.
The following example demonstrates how to retry an asynchronous source with a delay:
Completable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.doOnComplete(() -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Flowable.timer(counter.get(), TimeUnit.SECONDS);
});
})
.blockingAwait();
retryWhen
does not operate by default on a particular Scheduler
.handler
- the handler that receives a Flowable delivering Throwables and should return a Publisher that
emits items to indicate retries or emits terminal events to indicate termination.NullPointerException
- if handler is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable startWith(@NonNull @NonNull CompletableSource other)
startWith
does not operate by default on a particular Scheduler
.other
- the other completable to run firstNullPointerException
- if other is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final <T> @NonNull Observable<T> startWith(@NonNull @NonNull ObservableSource<T> other)
startWith
does not operate by default on a particular Scheduler
.T
- the value typeother
- the other Observable to run firstNullPointerException
- if other is null@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <T> @NonNull Flowable<T> startWith(@NonNull @NonNull Publisher<T> other)
Flowable
honors the backpressure of the downstream consumer
and expects the other Publisher
to honor it as well.startWith
does not operate by default on a particular Scheduler
.T
- the value typeother
- the other Publisher to run firstNullPointerException
- if other is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull Completable hide()
Allows preventing certain identity-based optimizations (fusion).
hide
does not operate by default on a particular Scheduler
.History: 2.0.5 - experimental
@SchedulerSupport(value="none") @NonNull public final @NonNull Disposable subscribe()
subscribe
does not operate by default on a particular Scheduler
.@SchedulerSupport(value="none") public final void subscribe(@NonNull @NonNull CompletableObserver observer)
CompletableSource
CompletableObserver
to this CompletableSource
instance.subscribe
in interface CompletableSource
observer
- the CompletableObserver
, not null
protected abstract void subscribeActual(@NonNull @NonNull CompletableObserver observer)
CompletableObserver
s and
perform the business logic in your operator.
There is no need to call any of the plugin hooks on the current Completable
instance or
the CompletableObserver
; all hooks and basic safeguards have been
applied by subscribe(CompletableObserver)
before this method gets called.
observer
- the CompletableObserver instance, never null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final <E extends CompletableObserver> E subscribeWith(E observer)
Usage example:
Completable source = Completable.complete().delay(1, TimeUnit.SECONDS);
CompositeDisposable composite = new CompositeDisposable();
DisposableCompletableObserver ds = new DisposableCompletableObserver() {
// ...
};
composite.add(source.subscribeWith(ds));
subscribeWith
does not operate by default on a particular Scheduler
.E
- the type of the CompletableObserver to use and returnobserver
- the CompletableObserver (subclass) to use and return, not nullobserver
NullPointerException
- if observer
is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Disposable subscribe(@NonNull @NonNull Action onComplete, @NonNull @NonNull Consumer<? super Throwable> onError)
subscribe
does not operate by default on a particular Scheduler
.onComplete
- the runnable that is called if the Completable completes normallyonError
- the consumer that is called if this Completable emits an errorNullPointerException
- if either callback is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Disposable subscribe(@NonNull @NonNull Action onComplete)
If the Completable emits an error, it is wrapped into an
OnErrorNotImplementedException
and routed to the RxJavaPlugins.onError handler.
subscribe
does not operate by default on a particular Scheduler
.onComplete
- the runnable called when this Completable completes normally@CheckReturnValue @NonNull @SchedulerSupport(value="custom") public final @NonNull Completable subscribeOn(@NonNull @NonNull Scheduler scheduler)
subscribeOn
operates on a Scheduler
you specify.scheduler
- the Scheduler to subscribe onNullPointerException
- if scheduler is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final @NonNull Completable takeUntil(@NonNull @NonNull CompletableSource other)
Completable
terminates (wins the termination race) while disposing the connection to the losing source.
takeUntil
does not operate by default on a particular Scheduler
.RxJavaPlugins.onError(Throwable)
.History: 2.1.17 - experimental
other
- the other completable source to observe for the terminal signals@CheckReturnValue @SchedulerSupport(value="io.reactivex:computation") @NonNull public final @NonNull Completable timeout(long timeout, @NonNull @NonNull TimeUnit unit)
timeout
signals the TimeoutException on the computation
Scheduler
.timeout
- the timeout valueunit
- the timeout unitNullPointerException
- if unit is null@CheckReturnValue @NonNull @SchedulerSupport(value="io.reactivex:computation") public final @NonNull Completable timeout(long timeout, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull CompletableSource other)
timeout
subscribes to the other CompletableSource on
the computation
Scheduler
.timeout
- the timeout valueunit
- the timeout unitother
- the other Completable instance to switch to in case of a timeoutNullPointerException
- if unit or other is null@CheckReturnValue @SchedulerSupport(value="custom") @NonNull public final @NonNull Completable timeout(long timeout, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler)
timeout
signals the TimeoutException on the Scheduler
you specify.timeout
- the timeout valueunit
- the timeout unitscheduler
- the scheduler to use to wait for completionNullPointerException
- if unit or scheduler is null@CheckReturnValue @NonNull @SchedulerSupport(value="custom") public final @NonNull Completable timeout(long timeout, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler, @NonNull @NonNull CompletableSource other)
timeout
subscribes to the other CompletableSource on
the Scheduler
you specify.timeout
- the timeout valueunit
- the timeout unitscheduler
- the scheduler to use to wait for completionother
- the other Completable instance to switch to in case of a timeoutNullPointerException
- if unit, scheduler or other is null@CheckReturnValue @SchedulerSupport(value="none") public final <R> R to(@NonNull @NonNull CompletableConverter<? extends R> converter)
This allows fluent conversion to any other type.
to
does not operate by default on a particular Scheduler
.History: 2.1.7 - experimental
R
- the resulting object typeconverter
- the function that receives the current Completable instance and returns a valueNullPointerException
- if converter is null@CheckReturnValue @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @NonNull public final <T> @NonNull Flowable<T> toFlowable()
Flowable
honors the backpressure of the downstream consumer.toFlowable
does not operate by default on a particular Scheduler
.T
- the value type@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final <T> @NonNull Maybe<T> toMaybe()
Maybe
.
toMaybe
does not operate by default on a particular Scheduler
.T
- the value typeMaybe
that only calls onComplete
or onError
, based on which one is
called by the source Completable.@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final <T> @NonNull Observable<T> toObservable()
toObservable
does not operate by default on a particular Scheduler
.T
- the value type@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final <T> @NonNull Single<T> toSingle(@NonNull @NonNull Supplier<? extends T> completionValueSupplier)
toSingle
does not operate by default on a particular Scheduler
.T
- the value typecompletionValueSupplier
- the value supplier called when this Completable completes normallyNullPointerException
- if completionValueSupplier is null@CheckReturnValue @NonNull @SchedulerSupport(value="none") public final <T> @NonNull Single<T> toSingleDefault(T completionValue)
toSingleDefault
does not operate by default on a particular Scheduler
.T
- the value typecompletionValue
- the value to emit when this Completable completes normallyNullPointerException
- if completionValue is null@CheckReturnValue @NonNull @SchedulerSupport(value="custom") public final @NonNull Completable unsubscribeOn(@NonNull @NonNull Scheduler scheduler)
unsubscribeOn
calls dispose() of the upstream on the Scheduler
you specify.scheduler
- the target scheduler where to execute the disposingNullPointerException
- if scheduler is null@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull TestObserver<Void> test()
test
does not operate by default on a particular Scheduler
.@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final @NonNull TestObserver<Void> test(boolean dispose)
dispose
- if true, the TestObserver will be cancelled before subscribing to this
Completable.
test
does not operate by default on a particular Scheduler
.@CheckReturnValue @SchedulerSupport(value="none") @NonNull public static @NonNull Completable fromCompletionStage(@NonNull @NonNull CompletionStage<?> stage)
CompletionStage
terminates.
Note that the operator takes an already instantiated, running or terminated CompletionStage
.
If the optional is to be created per consumer upon subscription, use defer(Supplier)
around fromCompletionStage
:
Maybe.defer(() -> Completable.fromCompletionStage(createCompletionStage()));
Canceling the flow can't cancel the execution of the CompletionStage
because CompletionStage
itself doesn't support cancellation. Instead, the operator detaches from the CompletionStage
.
fromCompletionStage
does not operate by default on a particular Scheduler
.stage
- the CompletionStage to convert to Maybe and signal its terminal value or error@CheckReturnValue @SchedulerSupport(value="none") @NonNull public final <T> @NonNull CompletionStage<T> toCompletionStage(@Nullable T defaultItem)
CompletionStage
.
The upstream can be canceled by converting the resulting CompletionStage
into
CompletableFuture
via CompletionStage.toCompletableFuture()
and
calling CompletableFuture.cancel(boolean)
on it.
The upstream will be also cancelled if the resulting CompletionStage
is converted to and
completed manually by CompletableFuture.complete(Object)
or CompletableFuture.completeExceptionally(Throwable)
.
CompletionStage
s don't have a notion of emptiness and allow null
s, therefore, one can either use
a defaultItem
of null
or turn the flow into a sequence of Optional
s and default to Optional.empty()
:
CompletionStage<Optional<T>> stage = source.map(Optional::of).toCompletionStage(Optional.empty());
toCompletionStage
does not operate by default on a particular Scheduler
.T
- the type of the default item to signal upon completiondefaultItem
- the item to signal if the upstream is empty