Class BehaviorSubject<T>
- Type Parameters:
T- the type of item expected to be observed by the Subject
- All Implemented Interfaces:
ObservableSource<T>, Observer<T>
Observer.
This subject does not have a public constructor by design; a new empty instance of this
BehaviorSubject can be created via the create() method and
a new non-empty instance can be created via createDefault(Object) (named as such to avoid
overload resolution conflict with Observable.create that creates an Observable, not a BehaviorSubject).
Since a Subject is conceptionally derived from the Processor type in the Reactive Streams specification,
nulls are not allowed (Rule 2.13) as
default initial values in createDefault(Object) or as parameters to onNext(Object) and
onError(Throwable). Such calls will result in a
NullPointerException being thrown and the subject's state is not changed.
Since a BehaviorSubject is an Observable, it does not support backpressure.
When this BehaviorSubject is terminated via onError(Throwable) or onComplete(), the
last observed item (if any) is cleared and late Observers only receive
the respective terminal event.
The BehaviorSubject does not support clearing its cached value (to appear empty again), however, the
effect can be achieved by using a special item and making sure Observers subscribe through a
filter whose predicate filters out this special item:
BehaviorSubject<Integer> subject = BehaviorSubject.create();
final Integer EMPTY = Integer.MIN_VALUE;
Observable<Integer> observable = subject.filter(v -> v != EMPTY);
TestObserver<Integer> to1 = observable.test();
subject.onNext(1);
// this will "clear" the cache
subject.onNext(EMPTY);
TestObserver<Integer> to2 = observable.test();
subject.onNext(2);
subject.onComplete();
// to1 received both non-empty items
to1.assertResult(1, 2);
// to2 received only 2 even though the current item was EMPTY
// when it got subscribed
to2.assertResult(2);
// Observers coming after the subject was terminated receive
// no items and only the onComplete event in this case.
observable.test().assertResult();
Even though BehaviorSubject implements the Observer interface, calling
onSubscribe is not required (Rule 2.12)
if the subject is used as a standalone source. However, calling onSubscribe
after the BehaviorSubject reached its terminal state will result in the
given Disposable being disposed immediately.
Calling onNext(Object), onError(Throwable) and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The Subject.toSerialized() method available to all Subjects
provides such serialization and also protects against reentrance (i.e., when a downstream Observer
consuming this subject also wants to call onNext(Object) on this subject recursively).
This BehaviorSubject supports the standard state-peeking methods hasComplete(), hasThrowable(),
getThrowable() and hasObservers() as well as means to read the latest observed value
in a non-blocking and thread-safe manner via hasValue() or getValue().
- Scheduler:
BehaviorSubjectdoes not operate by default on a particularSchedulerand theObservers get notified on the thread the respectiveonXXXmethods were invoked.- Error handling:
- When the
onError(Throwable)is called, theBehaviorSubjectenters into a terminal state and emits the sameThrowableinstance to the last set ofObservers. During this emission, if one or moreObservers dispose their respectiveDisposables, theThrowableis delivered to the global error handler viaRxJavaPlugins.onError(Throwable)(multiple times if multipleObservers cancel at once). If there were noObservers subscribed to thisBehaviorSubjectwhen theonError()was called, the global error handler is not invoked.
Example usage:
// observer will receive all 4 events (including "default").
BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
// observer will receive only onComplete
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onComplete();
subject.subscribe(observer);
// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T> @NonNull BehaviorSubject<T> create()Creates aBehaviorSubjectwithout a default item.static <@NonNull T>
@NonNull BehaviorSubject<T> createDefault(@NonNull T defaultValue) Creates aBehaviorSubjectthat emits the last item it observed and all subsequent items to eachObserverthat subscribes to it.Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.getValue()Returns a single value the Subject currently has or null if no such value exists.booleanReturns true if the subject has reached a terminal state through a complete event.booleanReturns true if the subject has any Observers.booleanReturns true if the subject has reached a terminal state through an error event.booleanhasValue()Returns true if the subject has any value.voidNotifies theObserverthat theObservablehas finished sending push-based notifications.voidNotifies theObserverthat theObservablehas experienced an error condition.voidProvides theObserverwith a new item to observe.voidProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.protected voidsubscribeActual(Observer<? super T> observer) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.Methods inherited from class Subject
toSerializedMethods inherited from class Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
Method Details
-
create
Creates aBehaviorSubjectwithout a default item.- Type Parameters:
T- the type of item the Subject will emit- Returns:
- the constructed
BehaviorSubject
-
createDefault
@CheckReturnValue @NonNull public static <@NonNull T> @NonNull BehaviorSubject<T> createDefault(@NonNull T defaultValue) Creates aBehaviorSubjectthat emits the last item it observed and all subsequent items to eachObserverthat subscribes to it.- Type Parameters:
T- the type of item the Subject will emit- Parameters:
defaultValue- the item that will be emitted first to anyObserveras long as theBehaviorSubjecthas not yet observed any items from its sourceObservable- Returns:
- the constructed
BehaviorSubject - Throws:
NullPointerException- ifdefaultValueisnull
-
subscribeActual
Description copied from class:ObservableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.There is no need to call any of the plugin hooks on the current
Observableinstance or theObserver; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)before this method gets called.- Specified by:
subscribeActualin classObservable<T>- Parameters:
observer- the incomingObserver, nevernull
-
onSubscribe
Description copied from interface:ObserverProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.- Parameters:
d- theDisposableinstance whoseDisposable.dispose()can be called anytime to cancel the connection
-
onNext
Description copied from interface:ObserverProvides theObserverwith a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onComplete()orObserver.onError(Throwable).- Parameters:
t- the item emitted by the Observable
-
onError
Description copied from interface:ObserverNotifies theObserverthat theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onComplete().- Parameters:
t- the exception encountered by the Observable
-
onComplete
public void onComplete()Description copied from interface:ObserverNotifies theObserverthat theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(Throwable). -
hasObservers
Description copied from class:SubjectReturns true if the subject has any Observers.The method is thread-safe.
- Specified by:
hasObserversin classSubject<T>- Returns:
- true if the subject has any Observers
-
getThrowable
Description copied from class:SubjectReturns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classSubject<T>- Returns:
- the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
-
getValue
Returns a single value the Subject currently has or null if no such value exists.The method is thread-safe.
- Returns:
- a single value the Subject currently has or null if no such value exists
-
hasComplete
Description copied from class:SubjectReturns true if the subject has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classSubject<T>- Returns:
- true if the subject has reached a terminal state through a complete event
- See Also:
-
hasThrowable
Description copied from class:SubjectReturns true if the subject has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classSubject<T>- Returns:
- true if the subject has reached a terminal state through an error event
- See Also:
-
hasValue
Returns true if the subject has any value.The method is thread-safe.
- Returns:
- true if the subject has any value
-