T
- the value type received and emitted by this Subject subclasspublic final class UnicastSubject<T> extends Subject<T>
Observer
subscribes to it, replays
those events to it until the Observer
catches up and then switches to relaying events live to
this single Observer
until this UnicastSubject
terminates or the Observer
disposes.
Note that UnicastSubject
holds an unbounded internal buffer.
This subject does not have a public constructor by design; a new empty instance of this
UnicastSubject
can be created via the following create
methods that
allow specifying the retention policy for items:
create()
- creates an empty, unbounded UnicastSubject
that
caches all items and the terminal event it receives.create(int)
- creates an empty, unbounded UnicastSubject
with a hint about how many total items one expects to retain.create(boolean)
- creates an empty, unbounded UnicastSubject
that
optionally delays an error it receives and replays it after the regular items have been emitted.create(int, Runnable)
- creates an empty, unbounded UnicastSubject
with a hint about how many total items one expects to retain and a callback that will be
called exactly once when the UnicastSubject
gets terminated or the single Observer
disposes.create(int, Runnable, boolean)
- creates an empty, unbounded UnicastSubject
with a hint about how many total items one expects to retain and a callback that will be
called exactly once when the UnicastSubject
gets terminated or the single Observer
disposes
and optionally delays an error it receives and replays it after the regular items have been emitted.
If more than one Observer
attempts to subscribe to this UnicastSubject
, they
will receive an IllegalStateException
indicating the single-use-only nature of this UnicastSubject
,
even if the UnicastSubject
already terminated with an error.
Since a Subject
is conceptionally derived from the Processor
type in the Reactive Streams specification,
null
s are not allowed (Rule 2.13) as
parameters to onNext(Object)
and onError(Throwable)
. Such calls will result in a
NullPointerException
being thrown and the subject's state is not changed.
Since a UnicastSubject
is an Observable
, it does not support backpressure.
When this UnicastSubject
is terminated via onError(Throwable)
the current or late single Observer
may receive the Throwable
before any available items could be emitted. To make sure an onError event is delivered
to the Observer
after the normal items, create a UnicastSubject
with the create(boolean)
or
create(int, Runnable, boolean)
factory methods.
Even though UnicastSubject
implements the Observer
interface, calling
onSubscribe
is not required (Rule 2.12)
if the subject is used as a standalone source. However, calling onSubscribe
after the UnicastSubject
reached its terminal state will result in the
given Disposable
being disposed immediately.
Calling onNext(Object)
, onError(Throwable)
and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The Subject.toSerialized()
method available to all Subject
s
provides such serialization and also protects against reentrance (i.e., when a downstream Observer
consuming this subject also wants to call onNext(Object)
on this subject recursively).
This UnicastSubject
supports the standard state-peeking methods hasComplete()
, hasThrowable()
,
getThrowable()
and hasObservers()
.
UnicastSubject
does not operate by default on a particular Scheduler
and
the single Observer
gets notified on the thread the respective onXXX
methods were invoked.onError(Throwable)
is called, the UnicastSubject
enters into a terminal state
and emits the same Throwable
instance to the current single Observer
. During this emission,
if the single Observer
s disposes its respective Disposable
, the
Throwable
is delivered to the global error handler via
RxJavaPlugins.onError(Throwable)
.
If there were no Observer
s subscribed to this UnicastSubject
when the onError()
was called, the global error handler is not invoked.
Example usage:
UnicastSubject<Integer> subject = UnicastSubject.create();
TestObserver<Integer> to1 = subject.test();
// fresh UnicastSubjects are empty
to1.assertEmpty();
TestObserver<Integer> to2 = subject.test();
// A UnicastSubject only allows one Observer during its lifetime
to2.assertFailure(IllegalStateException.class);
subject.onNext(1);
to1.assertValue(1);
subject.onNext(2);
to1.assertValues(1, 2);
subject.onComplete();
to1.assertResult(1, 2);
// ----------------------------------------------------
UnicastSubject<Integer> subject2 = UnicastSubject.create();
// a UnicastSubject caches events until its single Observer subscribes
subject2.onNext(1);
subject2.onNext(2);
subject2.onComplete();
TestObserver<Integer> to3 = subject2.test();
// the cached events are emitted in order
to3.assertResult(1, 2);
Modifier and Type | Method and Description |
---|---|
static <T> @NonNull UnicastSubject<T> |
create()
Creates an UnicastSubject with an internal buffer capacity hint 16.
|
static <T> @NonNull UnicastSubject<T> |
create(boolean delayError)
Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.
|
static <T> @NonNull UnicastSubject<T> |
create(int capacityHint)
Creates an UnicastSubject with the given internal buffer capacity hint.
|
static <T> @NonNull UnicastSubject<T> |
create(int capacityHint,
@NonNull Runnable onTerminate)
Creates an UnicastSubject with the given internal buffer capacity hint and a callback for
the case when the single Subscriber cancels its subscription
or the subject is terminated.
|
static <T> @NonNull UnicastSubject<T> |
create(int capacityHint,
@NonNull Runnable onTerminate,
boolean delayError)
Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and
a callback for the case when the single Observer disposes its
Disposable
or the subject is terminated. |
@Nullable Throwable |
getThrowable()
Returns the error that caused the Subject to terminate or null if the Subject
hasn't terminated yet.
|
boolean |
hasComplete()
Returns true if the subject has reached a terminal state through a complete event.
|
boolean |
hasObservers()
Returns true if the subject has any Observers.
|
boolean |
hasThrowable()
Returns true if the subject has reached a terminal state through an error event.
|
void |
onComplete()
Notifies the
Observer that the Observable has finished sending push-based notifications. |
void |
onError(Throwable t)
Notifies the
Observer that the Observable has experienced an error condition. |
void |
onNext(T t)
Provides the
Observer with a new item to observe. |
void |
onSubscribe(Disposable d)
Provides the
Observer with the means of cancelling (disposing) the
connection (channel) with the Observable in both
synchronous (from within Observer.onNext(Object) ) and asynchronous manner. |
protected void |
subscribeActual(Observer<? super T> observer)
Operator implementations (both source and intermediate) should implement this method that
performs the necessary business logic and handles the incoming
Observer s. |
toSerialized
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create()
T
- the value type@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint)
T
- the value typecapacityHint
- the hint to size the internal unbounded bufferIllegalArgumentException
- if capacityHint
is non-positive@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint, @NonNull Runnable onTerminate)
The callback, if not null, is called exactly once and non-overlapped with any active replay.
T
- the value typecapacityHint
- the hint to size the internal unbounded bufferonTerminate
- the callback to run when the Subject is terminated or cancelled, null not allowedNullPointerException
- if onTerminate
is null
IllegalArgumentException
- if capacityHint
is non-positive@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint, @NonNull Runnable onTerminate, boolean delayError)
Disposable
or the subject is terminated.
The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
T
- the value typecapacityHint
- the hint to size the internal unbounded bufferonTerminate
- the callback to run when the Subject is terminated or cancelled, null not alloweddelayError
- deliver pending onNext events before onErrorNullPointerException
- if onTerminate
is null
IllegalArgumentException
- if capacityHint
is non-positive@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(boolean delayError)
The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
T
- the value typedelayError
- deliver pending onNext events before onErrorprotected void subscribeActual(Observer<? super T> observer)
Observable
Observer
s.
There is no need to call any of the plugin hooks on the current Observable
instance or
the Observer
; all hooks and basic safeguards have been
applied by Observable.subscribe(Observer)
before this method gets called.
subscribeActual
in class Observable<T>
observer
- the incoming Observer
, never null
public void onSubscribe(Disposable d)
Observer
Observer
with the means of cancelling (disposing) the
connection (channel) with the Observable
in both
synchronous (from within Observer.onNext(Object)
) and asynchronous manner.d
- the Disposable
instance whose Disposable.dispose()
can
be called anytime to cancel the connectionpublic void onNext(T t)
Observer
Observer
with a new item to observe.
The Observable
may call this method 0 or more times.
The Observable
will not call this method again after it calls either Observer.onComplete()
or
Observer.onError(java.lang.Throwable)
.
t
- the item emitted by the Observablepublic void onError(Throwable t)
Observer
Observer
that the Observable
has experienced an error condition.
If the Observable
calls this method, it will not thereafter call Observer.onNext(T)
or
Observer.onComplete()
.
t
- the exception encountered by the Observablepublic void onComplete()
Observer
Observer
that the Observable
has finished sending push-based notifications.
The Observable
will not call this method if it calls Observer.onError(java.lang.Throwable)
.
@CheckReturnValue public boolean hasObservers()
Subject
The method is thread-safe.
hasObservers
in class Subject<T>
@Nullable @CheckReturnValue public @Nullable Throwable getThrowable()
Subject
The method is thread-safe.
getThrowable
in class Subject<T>
@CheckReturnValue public boolean hasThrowable()
Subject
The method is thread-safe.
hasThrowable
in class Subject<T>
Subject.getThrowable()
,
Subject.hasComplete()
@CheckReturnValue public boolean hasComplete()
Subject
The method is thread-safe.
hasComplete
in class Subject<T>
Subject.hasThrowable()