T
- the input and output value type@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final class MulticastProcessor<T> extends FlowableProcessor<T>
FlowableProcessor
implementation that coordinates downstream requests through
a front-buffer and stable-prefetching, optionally canceling the upstream if all
subscribers have cancelled.
This processor does not have a public constructor by design; a new empty instance of this
MulticastProcessor
can be created via the following create
methods that
allow configuring it:
create()
: create an empty MulticastProcessor
with
Flowable.bufferSize()
prefetch amount
and no reference counting behavior.create(int)
: create an empty MulticastProcessor
with
the given prefetch amount and no reference counting behavior.create(boolean)
: create an empty MulticastProcessor
with
Flowable.bufferSize()
prefetch amount
and an optional reference counting behavior.create(int, boolean)
: create an empty MulticastProcessor
with
the given prefetch amount and an optional reference counting behavior.
When the reference counting behavior is enabled, the MulticastProcessor
cancels its
upstream when all Subscriber
s have cancelled. Late Subscriber
s will then be
immediately completed.
Because MulticastProcessor
implements the Subscriber
interface, calling
onSubscribe
is mandatory (Rule 2.12).
If MulticastProcessor
should run standalone, i.e., without subscribing the MulticastProcessor
to another Publisher
,
use start()
or startUnbounded()
methods to initialize the internal buffer.
Failing to do so will lead to a NullPointerException
at runtime.
Use offer(Object)
to try and offer/emit items but don't fail if the
internal buffer is full.
A MulticastProcessor
is a Processor
type in the Reactive Streams specification,
null
s are not allowed (Rule 2.13) as
parameters to onSubscribe(Subscription)
, offer(Object)
, onNext(Object)
and onError(Throwable)
.
Such calls will result in a NullPointerException
being thrown and the processor's state is not changed.
Since a MulticastProcessor
is a Flowable
, it supports backpressure.
The backpressure from the currently subscribed Subscriber
s are coordinated by emitting upstream
items only if all of those Subscriber
s have requested at least one item. This behavior
is also called lockstep-mode because even if some Subscriber
s can take any number
of items, other Subscriber
s requesting less or infrequently will slow down the overall
throughput of the flow.
Calling onNext(Object)
, offer(Object)
, onError(Throwable)
and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The FlowableProcessor.toSerialized()
method available to all FlowableProcessor
s
provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber
consuming this processor also wants to call onNext(Object)
on this processor recursively).
This MulticastProcessor
supports the standard state-peeking methods hasComplete()
, hasThrowable()
,
getThrowable()
and hasSubscribers()
. This processor doesn't allow peeking into its buffer.
When this MulticastProcessor
is terminated via onError(Throwable)
or onComplete()
,
all previously signaled but not yet consumed items will be still available to Subscriber
s and the respective
terminal even is only emitted when all previous items have been successfully delivered to Subscriber
s.
If there are no Subscriber
s, the remaining items will be buffered indefinitely.
The MulticastProcessor
does not support clearing its cached events (to appear empty again).
Subscriber
s are coordinated by emitting upstream
items only if all of those Subscriber
s have requested at least one item. This behavior
is also called lockstep-mode because even if some Subscriber
s can take any number
of items, other Subscriber
s requesting less or infrequently will slow down the overall
throughput of the flow.MulticastProcessor
does not operate by default on a particular Scheduler
and
the Subscriber
s get notified on an arbitrary thread in a serialized fashion.Example:
MulticastProcessor<Integer> mp = Flowable.range(1, 10)
.subscribeWith(MulticastProcessor.create());
mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// --------------------
MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
mp2.start();
assertTrue(mp2.offer(1));
assertTrue(mp2.offer(2));
assertTrue(mp2.offer(3));
assertTrue(mp2.offer(4));
assertFalse(mp2.offer(5));
mp2.onComplete();
mp2.test().assertResult(1, 2, 3, 4);
History: 2.1.14 - experimental
Modifier and Type | Method and Description |
---|---|
static <T> @NonNull MulticastProcessor<T> |
create()
Constructs a fresh instance with the default Flowable.bufferSize() prefetch
amount and no refCount-behavior.
|
static <T> @NonNull MulticastProcessor<T> |
create(boolean refCount)
Constructs a fresh instance with the default Flowable.bufferSize() prefetch
amount and the optional refCount-behavior.
|
static <T> @NonNull MulticastProcessor<T> |
create(int bufferSize)
Constructs a fresh instance with the given prefetch amount and no refCount behavior.
|
static <T> @NonNull MulticastProcessor<T> |
create(int bufferSize,
boolean refCount)
Constructs a fresh instance with the given prefetch amount and the optional
refCount-behavior.
|
Throwable |
getThrowable()
Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
hasn't terminated yet.
|
boolean |
hasComplete()
Returns true if the FlowableProcessor has reached a terminal state through a complete event.
|
boolean |
hasSubscribers()
Returns true if the FlowableProcessor has subscribers.
|
boolean |
hasThrowable()
Returns true if the FlowableProcessor has reached a terminal state through an error event.
|
boolean |
offer(T t)
Tries to offer an item into the internal queue and returns false
if the queue is full.
|
void |
onComplete() |
void |
onError(@NonNull Throwable t) |
void |
onNext(T t) |
void |
onSubscribe(@NonNull Subscription s)
Implementors of this method should make sure everything that needs
to be visible in
Subscriber.onNext(Object) is established before
calling Subscription.request(long) . |
void |
start()
Initializes this Processor by setting an upstream Subscription that
ignores request amounts, uses a fixed buffer
and allows using the onXXX and offer methods
afterwards.
|
void |
startUnbounded()
Initializes this Processor by setting an upstream Subscription that
ignores request amounts, uses an unbounded buffer
and allows using the onXXX and offer methods
afterwards.
|
protected void |
subscribeActual(@NonNull Subscriber<? super T> s)
Operator implementations (both source and intermediate) should implement this method that
performs the necessary business logic and handles the incoming
Subscriber s. |
toSerialized
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create()
T
- the input and output value type@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create(boolean refCount)
T
- the input and output value typerefCount
- if true and if all Subscribers have canceled, the upstream
is cancelled@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create(int bufferSize)
T
- the input and output value typebufferSize
- the prefetch amountIllegalArgumentException
- if bufferSize
is non-positive@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create(int bufferSize, boolean refCount)
T
- the input and output value typebufferSize
- the prefetch amountrefCount
- if true and if all Subscribers have canceled, the upstream
is cancelledIllegalArgumentException
- if bufferSize
is non-positivepublic void start()
public void startUnbounded()
public void onSubscribe(@NonNull Subscription s)
FlowableSubscriber
Subscriber.onNext(Object)
is established before
calling Subscription.request(long)
. In practice this means
no initialization should happen after the request()
call and
additional behavior is thread safe in respect to onNext
.
@CheckReturnValue public boolean offer(@NonNull T t)
t
- the item to offer, not null
NullPointerException
- if t
is null
IllegalStateException
- if the processor is in fusion modepublic void onComplete()
@CheckReturnValue public boolean hasSubscribers()
FlowableProcessor
The method is thread-safe.
hasSubscribers
in class FlowableProcessor<T>
@CheckReturnValue public boolean hasThrowable()
FlowableProcessor
The method is thread-safe.
hasThrowable
in class FlowableProcessor<T>
FlowableProcessor.getThrowable()
,
FlowableProcessor.hasComplete()
@CheckReturnValue public boolean hasComplete()
FlowableProcessor
The method is thread-safe.
hasComplete
in class FlowableProcessor<T>
FlowableProcessor.hasThrowable()
@CheckReturnValue public Throwable getThrowable()
FlowableProcessor
The method is thread-safe.
getThrowable
in class FlowableProcessor<T>
protected void subscribeActual(@NonNull Subscriber<? super T> s)
Flowable
Subscriber
s.
There is no need to call any of the plugin hooks on the current Flowable
instance or
the Subscriber
; all hooks and basic safeguards have been
applied by Flowable.subscribe(Subscriber)
before this method gets called.
subscribeActual
in class Flowable<T>
s
- the incoming Subscriber
, never null