Class MulticastProcessor<T>
- Type Parameters:
T- the input and output value type
- All Implemented Interfaces:
FlowableSubscriber<T>, Flow.Processor<T,T>, Flow.Publisher<T>, Flow.Subscriber<T>
FlowableProcessor implementation that coordinates downstream requests through
a front-buffer and stable-prefetching, optionally canceling the upstream if all
subscribers have cancelled.
This processor does not have a public constructor by design; a new empty instance of this
MulticastProcessor can be created via the following create methods that
allow configuring it:
create(): create an emptyMulticastProcessorwithFlowable.bufferSize()prefetch amount and no reference counting behavior.create(int): create an emptyMulticastProcessorwith the given prefetch amount and no reference counting behavior.create(boolean): create an emptyMulticastProcessorwithFlowable.bufferSize()prefetch amount and an optional reference counting behavior.create(int, boolean): create an emptyMulticastProcessorwith the given prefetch amount and an optional reference counting behavior.
When the reference counting behavior is enabled, the MulticastProcessor cancels its
upstream when all Flow.Subscribers have cancelled. Late Subscribers will then be
immediately completed.
Because MulticastProcessor implements the Flow.Subscriber interface, calling
onSubscribe is mandatory (Rule 2.12).
If MulticastProcessor should run standalone, i.e., without subscribing the MulticastProcessor to another Flow.Publisher,
use start() or startUnbounded() methods to initialize the internal buffer.
Failing to do so will lead to a NullPointerException at runtime.
Use offer(Object) to try and offer/emit items but don't fail if the
internal buffer is full.
A MulticastProcessor is a Flow.Processor type in the Reactive Streams specification,
nulls are not allowed (Rule 2.13) as
parameters to onSubscribe(Subscription), offer(Object), onNext(Object) and onError(Throwable).
Such calls will result in a NullPointerException being thrown and the processor's state is not changed.
Since a MulticastProcessor is a Flowable, it supports backpressure.
The backpressure from the currently subscribed Flow.Subscribers are coordinated by emitting upstream
items only if all of those Subscribers have requested at least one item. This behavior
is also called lockstep-mode because even if some Subscribers can take any number
of items, other Subscribers requesting less or infrequently will slow down the overall
throughput of the flow.
Calling onNext(Object), offer(Object), onError(Throwable) and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors
provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber
consuming this processor also wants to call onNext(Object) on this processor recursively).
This MulticastProcessor supports the standard state-peeking methods hasComplete(), hasThrowable(),
getThrowable() and hasSubscribers(). This processor doesn't allow peeking into its buffer.
When this MulticastProcessor is terminated via onError(Throwable) or onComplete(),
all previously signaled but not yet consumed items will be still available to Subscribers and the respective
terminal even is only emitted when all previous items have been successfully delivered to Subscribers.
If there are no Subscribers, the remaining items will be buffered indefinitely.
The MulticastProcessor does not support clearing its cached events (to appear empty again).
- Backpressure:
- The backpressure from the currently subscribed
Subscribers are coordinated by emitting upstream items only if all of thoseSubscribers have requested at least one item. This behavior is also called lockstep-mode because even if someSubscribers can take any number of items, otherSubscribers requesting less or infrequently will slow down the overall throughput of the flow. - Scheduler:
MulticastProcessordoes not operate by default on a particularSchedulerand theSubscribers get notified on an arbitrary thread in a serialized fashion.
Example:
MulticastProcessor<Integer> mp = Flowable.range(1, 10)
.subscribeWith(MulticastProcessor.create());
mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// --------------------
MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
mp2.start();
assertTrue(mp2.offer(1));
assertTrue(mp2.offer(2));
assertTrue(mp2.offer(3));
assertTrue(mp2.offer(4));
assertFalse(mp2.offer(5));
mp2.onComplete();
mp2.test().assertResult(1, 2, 3, 4);
History: 2.1.14 - experimental
- Since:
- 2.2
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T> @NonNull MulticastProcessor<T> create()Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and no refCount-behavior.static <T> @NonNull MulticastProcessor<T> create(boolean refCount) Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and the optional refCount-behavior.static <T> @NonNull MulticastProcessor<T> create(int bufferSize) Constructs a fresh instance with the given prefetch amount and no refCount behavior.static <T> @NonNull MulticastProcessor<T> create(int bufferSize, boolean refCount) Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.booleanReturns true if the FlowableProcessor has reached a terminal state through a complete event.booleanReturns true if the FlowableProcessor has subscribers.booleanReturns true if the FlowableProcessor has reached a terminal state through an error event.booleanTries to offer an item into the internal queue and returns false if the queue is full.voidvoidvoidvoidImplementors of this method should make sure everything that needs to be visible inFlow.Subscriber.onNext(Object)is established before callingFlow.Subscription.request(long).voidstart()Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses a fixed buffer and allows using the onXXX and offer methods afterwards.voidInitializes this Processor by setting an upstream Subscription that ignores request amounts, uses an unbounded buffer and allows using the onXXX and offer methods afterwards.protected voidsubscribeActual(@NonNull Flow.Subscriber<? super @NonNull T> s) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingFlow.Subscribers.Methods inherited from class FlowableProcessor
toSerializedMethods inherited from class Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWithMethods inherited from class Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface Flow.Publisher
subscribe
-
Method Details
-
create
Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and no refCount-behavior.- Type Parameters:
T- the input and output value type- Returns:
- the new MulticastProcessor instance
-
create
@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create(boolean refCount) Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and the optional refCount-behavior.- Type Parameters:
T- the input and output value type- Parameters:
refCount- if true and if all Subscribers have canceled, the upstream is cancelled- Returns:
- the new MulticastProcessor instance
-
create
Constructs a fresh instance with the given prefetch amount and no refCount behavior.- Type Parameters:
T- the input and output value type- Parameters:
bufferSize- the prefetch amount- Returns:
- the new MulticastProcessor instance
- Throws:
IllegalArgumentException- ifbufferSizeis non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create(int bufferSize, boolean refCount) Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.- Type Parameters:
T- the input and output value type- Parameters:
bufferSize- the prefetch amountrefCount- if true and if all Subscribers have canceled, the upstream is cancelled- Returns:
- the new MulticastProcessor instance
- Throws:
IllegalArgumentException- ifbufferSizeis non-positive
-
start
public void start()Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses a fixed buffer and allows using the onXXX and offer methods afterwards. -
startUnbounded
public void startUnbounded()Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses an unbounded buffer and allows using the onXXX and offer methods afterwards. -
onSubscribe
Description copied from interface:FlowableSubscriberImplementors of this method should make sure everything that needs to be visible inFlow.Subscriber.onNext(Object)is established before callingFlow.Subscription.request(long). In practice this means no initialization should happen after therequest()call and additional behavior is thread safe in respect toonNext. -
onNext
-
offer
Tries to offer an item into the internal queue and returns false if the queue is full.- Parameters:
t- the item to offer, notnull- Returns:
- true if successful, false if the queue is full
- Throws:
NullPointerException- iftisnullIllegalStateException- if the processor is in fusion mode
-
onError
-
onComplete
public void onComplete() -
hasSubscribers
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has subscribers.The method is thread-safe.
- Specified by:
hasSubscribersin classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has subscribers
-
hasThrowable
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through an error event
- See Also:
-
hasComplete
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through a complete event
- See Also:
-
getThrowable
Description copied from class:FlowableProcessorReturns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classFlowableProcessor<T>- Returns:
- the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
-
subscribeActual
Description copied from class:FlowableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingFlow.Subscribers.There is no need to call any of the plugin hooks on the current
Flowableinstance or theSubscriber; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)before this method gets called.- Specified by:
subscribeActualin classFlowable<T>- Parameters:
s- the incomingSubscriber, nevernull
-