Class UnicastSubject<T>
- Type Parameters:
T- the value type received and emitted by this Subject subclass
- All Implemented Interfaces:
ObservableSource<T>, Observer<T>
Observer subscribes to it, replays
those events to it until the Observer catches up and then switches to relaying events live to
this single Observer until this UnicastSubject terminates or the Observer disposes.
Note that UnicastSubject holds an unbounded internal buffer.
This subject does not have a public constructor by design; a new empty instance of this
UnicastSubject can be created via the following create methods that
allow specifying the retention policy for items:
create()- creates an empty, unboundedUnicastSubjectthat caches all items and the terminal event it receives.create(int)- creates an empty, unboundedUnicastSubjectwith a hint about how many total items one expects to retain.create(boolean)- creates an empty, unboundedUnicastSubjectthat optionally delays an error it receives and replays it after the regular items have been emitted.create(int, Runnable)- creates an empty, unboundedUnicastSubjectwith a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastSubjectgets terminated or the singleObserverdisposes.create(int, Runnable, boolean)- creates an empty, unboundedUnicastSubjectwith a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastSubjectgets terminated or the singleObserverdisposes and optionally delays an error it receives and replays it after the regular items have been emitted.
If more than one Observer attempts to subscribe to this UnicastSubject, they
will receive an IllegalStateException indicating the single-use-only nature of this UnicastSubject,
even if the UnicastSubject already terminated with an error.
Since a Subject is conceptionally derived from the Processor type in the Reactive Streams specification,
nulls are not allowed (Rule 2.13) as
parameters to onNext(Object) and onError(Throwable). Such calls will result in a
NullPointerException being thrown and the subject's state is not changed.
Since a UnicastSubject is an Observable, it does not support backpressure.
When this UnicastSubject is terminated via onError(Throwable) the current or late single Observer
may receive the Throwable before any available items could be emitted. To make sure an onError event is delivered
to the Observer after the normal items, create a UnicastSubject with the create(boolean) or
create(int, Runnable, boolean) factory methods.
Even though UnicastSubject implements the Observer interface, calling
onSubscribe is not required (Rule 2.12)
if the subject is used as a standalone source. However, calling onSubscribe
after the UnicastSubject reached its terminal state will result in the
given Disposable being disposed immediately.
Calling onNext(Object), onError(Throwable) and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The Subject.toSerialized() method available to all Subjects
provides such serialization and also protects against reentrance (i.e., when a downstream Observer
consuming this subject also wants to call onNext(Object) on this subject recursively).
This UnicastSubject supports the standard state-peeking methods hasComplete(), hasThrowable(),
getThrowable() and hasObservers().
- Scheduler:
UnicastSubjectdoes not operate by default on a particularSchedulerand the singleObservergets notified on the thread the respectiveonXXXmethods were invoked.- Error handling:
- When the
onError(Throwable)is called, theUnicastSubjectenters into a terminal state and emits the sameThrowableinstance to the current singleObserver. During this emission, if the singleObservers disposes its respectiveDisposable, theThrowableis delivered to the global error handler viaRxJavaPlugins.onError(Throwable). If there were noObservers subscribed to thisUnicastSubjectwhen theonError()was called, the global error handler is not invoked.
Example usage:
UnicastSubject<Integer> subject = UnicastSubject.create();
TestObserver<Integer> to1 = subject.test();
// fresh UnicastSubjects are empty
to1.assertEmpty();
TestObserver<Integer> to2 = subject.test();
// A UnicastSubject only allows one Observer during its lifetime
to2.assertFailure(IllegalStateException.class);
subject.onNext(1);
to1.assertValue(1);
subject.onNext(2);
to1.assertValues(1, 2);
subject.onComplete();
to1.assertResult(1, 2);
// ----------------------------------------------------
UnicastSubject<Integer> subject2 = UnicastSubject.create();
// a UnicastSubject caches events until its single Observer subscribes
subject2.onNext(1);
subject2.onNext(2);
subject2.onComplete();
TestObserver<Integer> to3 = subject2.test();
// the cached events are emitted in order
to3.assertResult(1, 2);
- Since:
- 2.0
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T> @NonNull UnicastSubject<T> create()Creates an UnicastSubject with an internal buffer capacity hint 16.static <T> @NonNull UnicastSubject<T> create(boolean delayError) Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.static <T> @NonNull UnicastSubject<T> create(int capacityHint) Creates an UnicastSubject with the given internal buffer capacity hint.static <T> @NonNull UnicastSubject<T> Creates an UnicastSubject with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the subject is terminated.static <T> @NonNull UnicastSubject<T> Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Observer disposes itsDisposableor the subject is terminated.Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.booleanReturns true if the subject has reached a terminal state through a complete event.booleanReturns true if the subject has any Observers.booleanReturns true if the subject has reached a terminal state through an error event.voidNotifies theObserverthat theObservablehas finished sending push-based notifications.voidNotifies theObserverthat theObservablehas experienced an error condition.voidProvides theObserverwith a new item to observe.voidProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.protected voidsubscribeActual(Observer<? super T> observer) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.Methods inherited from class Subject
toSerializedMethods inherited from class Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
Method Details
-
create
Creates an UnicastSubject with an internal buffer capacity hint 16.- Type Parameters:
T- the value type- Returns:
- an UnicastSubject instance
-
create
Creates an UnicastSubject with the given internal buffer capacity hint.- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded buffer- Returns:
- an UnicastSubject instance
- Throws:
IllegalArgumentException- ifcapacityHintis non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint, @NonNull @NonNull Runnable onTerminate) Creates an UnicastSubject with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the subject is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded bufferonTerminate- the callback to run when the Subject is terminated or cancelled, null not allowed- Returns:
- an UnicastSubject instance
- Throws:
NullPointerException- ifonTerminateisnullIllegalArgumentException- ifcapacityHintis non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint, @NonNull @NonNull Runnable onTerminate, boolean delayError) Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Observer disposes itsDisposableor the subject is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded bufferonTerminate- the callback to run when the Subject is terminated or cancelled, null not alloweddelayError- deliver pending onNext events before onError- Returns:
- an UnicastSubject instance
- Throws:
NullPointerException- ifonTerminateisnullIllegalArgumentException- ifcapacityHintis non-positive- Since:
- 2.2
-
create
Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T- the value type- Parameters:
delayError- deliver pending onNext events before onError- Returns:
- an UnicastSubject instance
- Since:
- 2.2
-
subscribeActual
Description copied from class:ObservableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.There is no need to call any of the plugin hooks on the current
Observableinstance or theObserver; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)before this method gets called.- Specified by:
subscribeActualin classObservable<T>- Parameters:
observer- the incomingObserver, nevernull
-
onSubscribe
Description copied from interface:ObserverProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.- Parameters:
d- theDisposableinstance whoseDisposable.dispose()can be called anytime to cancel the connection
-
onNext
Description copied from interface:ObserverProvides theObserverwith a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onComplete()orObserver.onError(Throwable).- Parameters:
t- the item emitted by the Observable
-
onError
Description copied from interface:ObserverNotifies theObserverthat theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onComplete().- Parameters:
t- the exception encountered by the Observable
-
onComplete
public void onComplete()Description copied from interface:ObserverNotifies theObserverthat theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(Throwable). -
hasObservers
Description copied from class:SubjectReturns true if the subject has any Observers.The method is thread-safe.
- Specified by:
hasObserversin classSubject<T>- Returns:
- true if the subject has any Observers
-
getThrowable
Description copied from class:SubjectReturns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classSubject<T>- Returns:
- the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
-
hasThrowable
Description copied from class:SubjectReturns true if the subject has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classSubject<T>- Returns:
- true if the subject has reached a terminal state through an error event
- See Also:
-
hasComplete
Description copied from class:SubjectReturns true if the subject has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classSubject<T>- Returns:
- true if the subject has reached a terminal state through a complete event
- See Also:
-