T
- the value typepublic final class ReplaySubject<T> extends Subject<T>
Observer
s.
This subject does not have a public constructor by design; a new empty instance of this
ReplaySubject
can be created via the following create
methods that
allow specifying the retention policy for items:
create()
- creates an empty, unbounded ReplaySubject
that
caches all items and the terminal event it receives.
create(int)
- creates an empty, unbounded ReplaySubject
with a hint about how many total items one expects to retain.
createWithSize(int)
- creates an empty, size-bound ReplaySubject
that retains at most the given number of the latest item it receives.
createWithTime(long, TimeUnit, Scheduler)
- creates an empty, time-bound
ReplaySubject
that retains items no older than the specified time amount.
createWithTimeAndSize(long, TimeUnit, Scheduler, int)
- creates an empty,
time- and size-bound ReplaySubject
that retains at most the given number
items that are also not older than the specified time amount.
Since a Subject
is conceptionally derived from the Processor
type in the Reactive Streams specification,
null
s are not allowed (Rule 2.13) as
parameters to onNext(Object)
and onError(Throwable)
. Such calls will result in a
NullPointerException
being thrown and the subject's state is not changed.
Since a ReplaySubject
is an Observable
, it does not support backpressure.
When this ReplaySubject
is terminated via onError(Throwable)
or onComplete()
,
late Observer
s will receive the retained/cached items first (if any) followed by the respective
terminal event. If the ReplaySubject
has a time-bound, the age of the retained/cached items are still considered
when replaying and thus it may result in no items being emitted before the terminal event.
Once an Observer
has subscribed, it will receive items continuously from that point on. Bounds only affect how
many past items a new Observer
will receive before it catches up with the live event feed.
Even though ReplaySubject
implements the Observer
interface, calling
onSubscribe
is not required (Rule 2.12)
if the subject is used as a standalone source. However, calling onSubscribe
after the ReplaySubject
reached its terminal state will result in the
given Disposable
being disposed immediately.
Calling onNext(Object)
, onError(Throwable)
and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The Subject.toSerialized()
method available to all Subject
s
provides such serialization and also protects against reentrance (i.e., when a downstream Observer
consuming this subject also wants to call onNext(Object)
on this subject recursively).
This ReplaySubject
supports the standard state-peeking methods hasComplete()
, hasThrowable()
,
getThrowable()
and hasObservers()
as well as means to read the retained/cached items
in a non-blocking and thread-safe manner via hasValue()
, getValue()
,
getValues()
or getValues(Object[])
.
Note that due to concurrency requirements, a size- and time-bounded ReplaySubject
may hold strong references to more
source emissions than specified while it isn't terminated yet. Use the cleanupBuffer()
to allow
such inaccessible items to be cleaned up by GC once no consumer references it anymore.
ReplaySubject
does not operate by default on a particular Scheduler
and
the Observer
s get notified on the thread the respective onXXX
methods were invoked.
Time-bound ReplaySubject
s use the given Scheduler
in their create
methods
as time source to timestamp of items received for the age checks.onError(Throwable)
is called, the ReplaySubject
enters into a terminal state
and emits the same Throwable
instance to the last set of Observer
s. During this emission,
if one or more Observer
s dispose their respective Disposable
s, the
Throwable
is delivered to the global error handler via
RxJavaPlugins.onError(Throwable)
(multiple times if multiple Observer
s
cancel at once).
If there were no Observer
s subscribed to this ReplaySubject
when the onError()
was called, the global error handler is not invoked.
Example usage:
ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();
// both of the following will get the onNext/onComplete calls from above
subject.subscribe(observer1);
subject.subscribe(observer2);
Modifier and Type | Method and Description |
---|---|
void |
cleanupBuffer()
Makes sure the item cached by the head node in a bounded
ReplaySubject is released (as it is never part of a replay).
|
static <T> ReplaySubject<T> |
create()
Creates an unbounded replay subject.
|
static <T> ReplaySubject<T> |
create(int capacityHint)
Creates an unbounded replay subject with the specified initial buffer capacity.
|
static <T> ReplaySubject<T> |
createWithSize(int maxSize)
Creates a size-bounded replay subject.
|
static <T> ReplaySubject<T> |
createWithTime(long maxAge,
TimeUnit unit,
Scheduler scheduler)
Creates a time-bounded replay subject.
|
static <T> ReplaySubject<T> |
createWithTimeAndSize(long maxAge,
TimeUnit unit,
Scheduler scheduler,
int maxSize)
Creates a time- and size-bounded replay subject.
|
Throwable |
getThrowable()
Returns the error that caused the Subject to terminate or null if the Subject
hasn't terminated yet.
|
T |
getValue()
Returns a single value the Subject currently has or null if no such value exists.
|
Object[] |
getValues()
Returns an Object array containing snapshot all values of the Subject.
|
T[] |
getValues(T[] array)
Returns a typed array containing a snapshot of all values of the Subject.
|
boolean |
hasComplete()
Returns true if the subject has reached a terminal state through a complete event.
|
boolean |
hasObservers()
Returns true if the subject has any Observers.
|
boolean |
hasThrowable()
Returns true if the subject has reached a terminal state through an error event.
|
boolean |
hasValue()
Returns true if the subject has any value.
|
void |
onComplete()
Notifies the Observer that the
Observable has finished sending push-based notifications. |
void |
onError(Throwable t)
Notifies the Observer that the
Observable has experienced an error condition. |
void |
onNext(T t)
Provides the Observer with a new item to observe.
|
void |
onSubscribe(Disposable d)
Provides the Observer with the means of cancelling (disposing) the
connection (channel) with the Observable in both
synchronous (from within
Observer.onNext(Object) ) and asynchronous manner. |
protected void |
subscribeActual(Observer<? super T> observer)
Operator implementations (both source and intermediate) should implement this method that
performs the necessary business logic and handles the incoming
Observer s. |
toSerialized
all, amb, ambArray, ambWith, any, as, blockingFirst, blockingFirst, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestDelayError, combineLatestDelayError, combineLatestDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, forEach, forEachWhile, forEachWhile, forEachWhile, fromArray, fromCallable, fromFuture, fromFuture, fromFuture, fromFuture, fromIterable, fromPublisher, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lift, map, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorResumeNext, onErrorResumeNext, onErrorReturn, onErrorReturnItem, onExceptionResumeNext, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWithArray, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipIterable, zipWith, zipWith, zipWith, zipWith
@CheckReturnValue @NonNull public static <T> ReplaySubject<T> create()
The internal buffer is backed by an ArrayList
and starts with an initial capacity of 16. Once the
number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the
number of items grows, this causes frequent array reallocation and copying, and may hurt performance
and latency. This can be avoided with the create(int)
overload which takes an initial capacity
parameter and can be tuned to reduce the array reallocation frequency as needed.
T
- the type of items observed and emitted by the Subject@CheckReturnValue @NonNull public static <T> ReplaySubject<T> create(int capacityHint)
Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new
items. For example, if you know that the buffer will hold 32k items, you can ask the
ReplaySubject
to preallocate its internal array with a capacity to hold that many items. Once
the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead
due to frequent array-copying.
T
- the type of items observed and emitted by the SubjectcapacityHint
- the initial buffer capacity@CheckReturnValue @NonNull public static <T> ReplaySubject<T> createWithSize(int maxSize)
In this setting, the ReplaySubject
holds at most size
items in its internal buffer and
discards the oldest item.
When observers subscribe to a terminated ReplaySubject
, they are guaranteed to see at most
size
onNext
events followed by a termination event.
If an observer subscribes while the ReplaySubject
is active, it will observe all items in the
buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to
the size constraint in the mean time. In other words, once an Observer subscribes, it will receive items
without gaps in the sequence.
T
- the type of items observed and emitted by the SubjectmaxSize
- the maximum number of buffered items@CheckReturnValue @NonNull public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler)
In this setting, the ReplaySubject
internally tags each observed item with a timestamp value
supplied by the Scheduler
and keeps only those whose age is less than the supplied time value
converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5
this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.
Once the subject is terminated, observers subscribing to it will receive items that remained in the buffer after the terminal event, regardless of their age.
If an observer subscribes while the ReplaySubject
is active, it will observe only those items
from within the buffer that have an age less than the specified time, and each item observed thereafter,
even if the buffer evicts items due to the time constraint in the mean time. In other words, once an
observer subscribes, it observes items without gaps in the sequence except for any outdated items at the
beginning of the sequence.
Note that terminal notifications (onError
and onComplete
) trigger eviction as well. For
example, with a max age of 5, the first item is observed at T=0, then an onComplete
notification
arrives at T=10. If an observer subscribes at T=11, it will find an empty ReplaySubject
with just
an onComplete
notification.
T
- the type of items observed and emitted by the SubjectmaxAge
- the maximum age of the contained itemsunit
- the time unit of time
scheduler
- the Scheduler
that provides the current time@CheckReturnValue @NonNull public static <T> ReplaySubject<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize)
In this setting, the ReplaySubject
internally tags each received item with a timestamp value
supplied by the Scheduler
and holds at most size
items in its internal buffer. It evicts
items from the start of the buffer if their age becomes less-than or equal to the supplied age in
milliseconds or the buffer reaches its size
limit.
When observers subscribe to a terminated ReplaySubject
, they observe the items that remained in
the buffer after the terminal notification, regardless of their age, but at most size
items.
If an observer subscribes while the ReplaySubject
is active, it will observe only those items
from within the buffer that have age less than the specified time and each subsequent item, even if the
buffer evicts items due to the time constraint in the mean time. In other words, once an observer
subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning
of the sequence.
Note that terminal notifications (onError
and onComplete
) trigger eviction as well. For
example, with a max age of 5, the first item is observed at T=0, then an onComplete
notification
arrives at T=10. If an observer subscribes at T=11, it will find an empty ReplaySubject
with just
an onComplete
notification.
T
- the type of items observed and emitted by the SubjectmaxAge
- the maximum age of the contained itemsunit
- the time unit of time
maxSize
- the maximum number of buffered itemsscheduler
- the Scheduler
that provides the current timeprotected void subscribeActual(Observer<? super T> observer)
Observable
Observer
s.
There is no need to call any of the plugin hooks on the current Observable
instance or
the Observer
; all hooks and basic safeguards have been
applied by Observable.subscribe(Observer)
before this method gets called.
subscribeActual
in class Observable<T>
observer
- the incoming Observer, never nullpublic void onSubscribe(Disposable d)
Observer
Observer.onNext(Object)
) and asynchronous manner.d
- the Disposable instance whose Disposable.dispose()
can
be called anytime to cancel the connectionpublic void onNext(T t)
Observer
The Observable
may call this method 0 or more times.
The Observable
will not call this method again after it calls either Observer.onComplete()
or
Observer.onError(java.lang.Throwable)
.
t
- the item emitted by the Observablepublic void onError(Throwable t)
Observer
Observable
has experienced an error condition.
If the Observable
calls this method, it will not thereafter call Observer.onNext(T)
or
Observer.onComplete()
.
t
- the exception encountered by the Observablepublic void onComplete()
Observer
Observable
has finished sending push-based notifications.
The Observable
will not call this method if it calls Observer.onError(java.lang.Throwable)
.
public boolean hasObservers()
Subject
The method is thread-safe.
hasObservers
in class Subject<T>
@Nullable public Throwable getThrowable()
Subject
The method is thread-safe.
getThrowable
in class Subject<T>
@Nullable public T getValue()
The method is thread-safe.
public void cleanupBuffer()
By default, live bounded buffers will remember one item before the currently receivable one to ensure subscribers can always receive a continuous sequence of items. A terminated ReplaySubject automatically releases this inaccessible item.
The method must be called sequentially, similar to the standard
onXXX
methods.
History: 2.1.11 - experimental
public Object[] getValues()
The method is thread-safe.
public T[] getValues(T[] array)
The method follows the conventions of Collection.toArray by setting the array element after the last value to null (if the capacity permits).
The method is thread-safe.
array
- the target array to copy values into if it fitspublic boolean hasComplete()
Subject
The method is thread-safe.
hasComplete
in class Subject<T>
Subject.hasThrowable()
public boolean hasThrowable()
Subject
The method is thread-safe.
hasThrowable
in class Subject<T>
Subject.getThrowable()
,
Subject.hasComplete()
public boolean hasValue()
The method is thread-safe.