T
- the value typepublic final class ReplayProcessor<T> extends FlowableProcessor<T>
The ReplayProcessor
supports the following item retention strategies:
create()
and create(int)
: retains and replays all events to current and
future Subscriber
s.
createWithSize(int)
: retains at most the given number of items and replays only these
latest items to new Subscriber
s.
createWithTime(long, TimeUnit, Scheduler)
: retains items no older than the specified time
and replays them to new Subscriber
s (which could mean all items age out).
createWithTimeAndSize(long, TimeUnit, Scheduler, int)
: retains no more than the given number of items
which are also no older than the specified time and replays them to new Subscriber
s (which could mean all items age out).
The ReplayProcessor
can be created in bounded and unbounded mode. It can be bounded by
size (maximum number of elements retained at most) and/or time (maximum age of elements replayed).
Since a ReplayProcessor
is a Reactive Streams Processor
,
null
s are not allowed (Rule 2.13) as
parameters to onNext(Object)
and onError(Throwable)
. Such calls will result in a
NullPointerException
being thrown and the processor's state is not changed.
This ReplayProcessor
respects the individual backpressure behavior of its Subscriber
s but
does not coordinate their request amounts towards the upstream (because there might not be any) and
consumes the upstream in an unbounded manner (requesting Long.MAX_VALUE
).
Note that Subscriber
s receive a continuous sequence of values after they subscribed even
if an individual item gets delayed due to backpressure.
Due to concurrency requirements, a size-bounded ReplayProcessor
may hold strong references to more source
emissions than specified.
When this ReplayProcessor
is terminated via onError(Throwable)
or onComplete()
,
late Subscriber
s will receive the retained/cached items first (if any) followed by the respective
terminal event. If the ReplayProcessor
has a time-bound, the age of the retained/cached items are still considered
when replaying and thus it may result in no items being emitted before the terminal event.
Once an Subscriber
has subscribed, it will receive items continuously from that point on. Bounds only affect how
many past items a new Subscriber
will receive before it catches up with the live event feed.
Even though ReplayProcessor
implements the Subscriber
interface, calling
onSubscribe
is not required (Rule 2.12)
if the processor is used as a standalone source. However, calling onSubscribe
after the ReplayProcessor
reached its terminal state will result in the
given Subscription
being canceled immediately.
Calling onNext(Object)
, onError(Throwable)
and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The FlowableProcessor.toSerialized()
method available to all FlowableProcessor
s
provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber
consuming this processor also wants to call onNext(Object)
on this processor recursively).
This ReplayProcessor
supports the standard state-peeking methods hasComplete()
, hasThrowable()
,
getThrowable()
and hasSubscribers()
as well as means to read the retained/cached items
in a non-blocking and thread-safe manner via hasValue()
, getValue()
,
getValues()
or getValues(Object[])
.
Note that due to concurrency requirements, a size- and time-bounded ReplayProcessor
may hold strong references to more
source emissions than specified while it isn't terminated yet. Use the cleanupBuffer()
to allow
such inaccessible items to be cleaned up by GC once no consumer references them anymore.
ReplayProcessor
respects the individual backpressure behavior of its Subscriber
s but
does not coordinate their request amounts towards the upstream (because there might not be any) and
consumes the upstream in an unbounded manner (requesting Long.MAX_VALUE
).
Note that Subscriber
s receive a continuous sequence of values after they subscribed even
if an individual item gets delayed due to backpressure.ReplayProcessor
does not operate by default on a particular Scheduler
and
the Subscriber
s get notified on the thread the respective onXXX
methods were invoked.
Time-bound ReplayProcessor
s use the given Scheduler
in their create
methods
as time source to timestamp of items received for the age checks.onError(Throwable)
is called, the ReplayProcessor
enters into a terminal state
and emits the same Throwable
instance to the last set of Subscriber
s. During this emission,
if one or more Subscriber
s cancel their respective Subscription
s, the
Throwable
is delivered to the global error handler via
RxJavaPlugins.onError(Throwable)
(multiple times if multiple Subscriber
s
cancel at once).
If there were no Subscriber
s subscribed to this ReplayProcessor
when the onError()
was called, the global error handler is not invoked.
Example usage:
ReplayProcessor<Object> processor = new ReplayProcessor<T>();
processor.onNext("one");
processor.onNext("two");
processor.onNext("three");
processor.onComplete();
// both of the following will get the onNext/onComplete calls from above
processor.subscribe(subscriber1);
processor.subscribe(subscriber2);
Modifier and Type | Method and Description |
---|---|
void |
cleanupBuffer()
Makes sure the item cached by the head node in a bounded
ReplayProcessor is released (as it is never part of a replay).
|
static <T> @NonNull ReplayProcessor<T> |
create()
Creates an unbounded ReplayProcessor.
|
static <T> @NonNull ReplayProcessor<T> |
create(int capacityHint)
Creates an unbounded ReplayProcessor with the specified initial buffer capacity.
|
static <T> @NonNull ReplayProcessor<T> |
createWithSize(int maxSize)
Creates a size-bounded ReplayProcessor.
|
static <T> @NonNull ReplayProcessor<T> |
createWithTime(long maxAge,
@NonNull TimeUnit unit,
@NonNull Scheduler scheduler)
Creates a time-bounded ReplayProcessor.
|
static <T> @NonNull ReplayProcessor<T> |
createWithTimeAndSize(long maxAge,
@NonNull TimeUnit unit,
@NonNull Scheduler scheduler,
int maxSize)
Creates a time- and size-bounded ReplayProcessor.
|
@Nullable Throwable |
getThrowable()
Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
hasn't terminated yet.
|
T |
getValue()
Returns the latest value this processor has or null if no such value exists.
|
Object[] |
getValues()
Returns an Object array containing snapshot all values of this processor.
|
T[] |
getValues(T[] array)
Returns a typed array containing a snapshot of all values of this processor.
|
boolean |
hasComplete()
Returns true if the FlowableProcessor has reached a terminal state through a complete event.
|
boolean |
hasSubscribers()
Returns true if the FlowableProcessor has subscribers.
|
boolean |
hasThrowable()
Returns true if the FlowableProcessor has reached a terminal state through an error event.
|
boolean |
hasValue()
Returns true if this processor has any value.
|
void |
onComplete() |
void |
onError(Throwable t) |
void |
onNext(T t) |
void |
onSubscribe(Subscription s)
Implementors of this method should make sure everything that needs
to be visible in
Subscriber.onNext(Object) is established before
calling Subscription.request(long) . |
protected void |
subscribeActual(Subscriber<? super T> s)
Operator implementations (both source and intermediate) should implement this method that
performs the necessary business logic and handles the incoming
Subscriber s. |
toSerialized
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> create()
The internal buffer is backed by an ArrayList
and starts with an initial capacity of 16. Once the
number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the
number of items grows, this causes frequent array reallocation and copying, and may hurt performance
and latency. This can be avoided with the create(int)
overload which takes an initial capacity
parameter and can be tuned to reduce the array reallocation frequency as needed.
T
- the type of items observed and emitted by the ReplayProcessor@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> create(int capacityHint)
Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new
items. For example, if you know that the buffer will hold 32k items, you can ask the
ReplayProcessor
to preallocate its internal array with a capacity to hold that many items. Once
the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead
due to frequent array-copying.
T
- the type of items observed and emitted by this type of processorcapacityHint
- the initial buffer capacityIllegalArgumentException
- if capacityHint
is non-positive@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> createWithSize(int maxSize)
In this setting, the ReplayProcessor
holds at most size
items in its internal buffer and
discards the oldest item.
When Subscriber
s subscribe to a terminated ReplayProcessor
, they are guaranteed to see at most
size
onNext
events followed by a termination event.
If a Subscriber
subscribes while the ReplayProcessor
is active, it will observe all items in the
buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to
the size constraint in the mean time. In other words, once a Subscriber
subscribes, it will receive items
without gaps in the sequence.
T
- the type of items observed and emitted by this type of processormaxSize
- the maximum number of buffered itemsIllegalArgumentException
- if maxSize
is non-positive@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> createWithTime(long maxAge, @NonNull TimeUnit unit, @NonNull Scheduler scheduler)
In this setting, the ReplayProcessor
internally tags each observed item with a timestamp value
supplied by the Scheduler
and keeps only those whose age is less than the supplied time value
converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5
this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.
Once the processor is terminated, Subscriber
s subscribing to it will receive items that remained in the
buffer after the terminal event, regardless of their age.
If a Subscriber
subscribes while the ReplayProcessor
is active, it will observe only those items
from within the buffer that have an age less than the specified time, and each item observed thereafter,
even if the buffer evicts items due to the time constraint in the mean time. In other words, once a
Subscriber
subscribes, it observes items without gaps in the sequence except for any outdated items at the
beginning of the sequence.
Note that terminal notifications (onError
and onComplete
) trigger eviction as well. For
example, with a max age of 5, the first item is observed at T=0, then an onComplete
notification
arrives at T=10. If a Subscriber
subscribes at T=11, it will find an empty ReplayProcessor
with just
an onComplete
notification.
T
- the type of items observed and emitted by this type of processormaxAge
- the maximum age of the contained itemsunit
- the time unit of time
scheduler
- the Scheduler
that provides the current timeNullPointerException
- if unit
or scheduler
is null
IllegalArgumentException
- if maxAge
is non-positive@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> createWithTimeAndSize(long maxAge, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, int maxSize)
In this setting, the ReplayProcessor
internally tags each received item with a timestamp value
supplied by the Scheduler
and holds at most size
items in its internal buffer. It evicts
items from the start of the buffer if their age becomes less-than or equal to the supplied age in
milliseconds or the buffer reaches its size
limit.
When Subscriber
s subscribe to a terminated ReplayProcessor
, they observe the items that remained in
the buffer after the terminal notification, regardless of their age, but at most size
items.
If a Subscriber
subscribes while the ReplayProcessor
is active, it will observe only those items
from within the buffer that have age less than the specified time and each subsequent item, even if the
buffer evicts items due to the time constraint in the mean time. In other words, once a Subscriber
subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning
of the sequence.
Note that terminal notifications (onError
and onComplete
) trigger eviction as well. For
example, with a max age of 5, the first item is observed at T=0, then an onComplete
notification
arrives at T=10. If a Subscriber
subscribes at T=11, it will find an empty ReplayProcessor
with just
an onComplete
notification.
T
- the type of items observed and emitted by this type of processormaxAge
- the maximum age of the contained itemsunit
- the time unit of time
maxSize
- the maximum number of buffered itemsscheduler
- the Scheduler
that provides the current timeNullPointerException
- if unit
or scheduler
is null
IllegalArgumentException
- if maxAge
or maxSize
is non-positiveprotected void subscribeActual(Subscriber<? super T> s)
Flowable
Subscriber
s.
There is no need to call any of the plugin hooks on the current Flowable
instance or
the Subscriber
; all hooks and basic safeguards have been
applied by Flowable.subscribe(Subscriber)
before this method gets called.
subscribeActual
in class Flowable<T>
s
- the incoming Subscriber
, never null
public void onSubscribe(Subscription s)
FlowableSubscriber
Subscriber.onNext(Object)
is established before
calling Subscription.request(long)
. In practice this means
no initialization should happen after the request()
call and
additional behavior is thread safe in respect to onNext
.
public void onNext(T t)
public void onError(Throwable t)
public void onComplete()
@CheckReturnValue public boolean hasSubscribers()
FlowableProcessor
The method is thread-safe.
hasSubscribers
in class FlowableProcessor<T>
@Nullable @CheckReturnValue public @Nullable Throwable getThrowable()
FlowableProcessor
The method is thread-safe.
getThrowable
in class FlowableProcessor<T>
public void cleanupBuffer()
By default, live bounded buffers will remember one item before the currently receivable one to ensure subscribers can always receive a continuous sequence of items. A terminated ReplayProcessor automatically releases this inaccessible item.
The method must be called sequentially, similar to the standard
onXXX
methods.
History: 2.1.11 - experimental
@CheckReturnValue public T getValue()
The method is thread-safe.
@CheckReturnValue public Object[] getValues()
The method is thread-safe.
@CheckReturnValue public T[] getValues(T[] array)
The method follows the conventions of Collection.toArray by setting the array element after the last value to null (if the capacity permits).
The method is thread-safe.
array
- the target array to copy values into if it fits@CheckReturnValue public boolean hasComplete()
FlowableProcessor
The method is thread-safe.
hasComplete
in class FlowableProcessor<T>
FlowableProcessor.hasThrowable()
@CheckReturnValue public boolean hasThrowable()
FlowableProcessor
The method is thread-safe.
hasThrowable
in class FlowableProcessor<T>
FlowableProcessor.getThrowable()
,
FlowableProcessor.hasComplete()
@CheckReturnValue public boolean hasValue()
The method is thread-safe.