Class AsyncProcessor<T>
- Type Parameters:
T- the value type
- All Implemented Interfaces:
FlowableSubscriber<T>, Flow.Processor<T,T>, Flow.Publisher<T>, Flow.Subscriber<T>
Flow.Subscribers.
This processor does not have a public constructor by design; a new empty instance of this
AsyncProcessor can be created via the create() method.
Since an AsyncProcessor is a Reactive Streams Processor type,
nulls are not allowed (Rule 2.13)
as parameters to onNext(Object) and onError(Throwable). Such calls will result in a
NullPointerException being thrown and the processor's state is not changed.
AsyncProcessor is a Flowable as well as a FlowableProcessor and supports backpressure from the downstream but
its Flow.Subscriber-side consumes items in an unbounded manner.
When this AsyncProcessor is terminated via onError(Throwable), the
last observed item (if any) is cleared and late Flow.Subscribers only receive
the onError event.
The AsyncProcessor caches the latest item internally and it emits this item only when onComplete is called.
Therefore, it is not recommended to use this Processor with infinite or never-completing sources.
Even though AsyncProcessor implements the Flow.Subscriber interface, calling
onSubscribe is not required (Rule 2.12)
if the processor is used as a standalone source. However, calling onSubscribe
after the AsyncProcessor reached its terminal state will result in the
given Flow.Subscription being canceled immediately.
Calling onNext(Object), onError(Throwable) and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors
provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber
consuming this processor also wants to call onNext(Object) on this processor recursively).
The implementation of onXXX methods are technically thread-safe but non-serialized calls
to them may lead to undefined state in the currently subscribed Subscribers.
This AsyncProcessor supports the standard state-peeking methods hasComplete(), hasThrowable(),
getThrowable() and hasSubscribers() as well as means to read the very last observed value -
after this AsyncProcessor has been completed - in a non-blocking and thread-safe
manner via hasValue() or getValue().
- Backpressure:
- The
AsyncProcessorhonors the backpressure of the downstreamSubscribers and won't emit its single value to a particularSubscriberuntil thatSubscriberhas requested an item. When theAsyncProcessoris subscribed to aFlowable, the processor consumes thisFlowablein an unbounded manner (requestingLong.MAX_VALUE) as only the very last upstream item is retained by it. - Scheduler:
AsyncProcessordoes not operate by default on a particularSchedulerand theSubscribers get notified on the thread where the terminatingonErrororonCompletemethods were invoked.- Error handling:
- When the
onError(Throwable)is called, theAsyncProcessorenters into a terminal state and emits the sameThrowableinstance to the last set ofSubscribers. During this emission, if one or moreSubscribers dispose their respectiveSubscriptions, theThrowableis delivered to the global error handler viaRxJavaPlugins.onError(Throwable)(multiple times if multipleSubscribers cancel at once). If there were noSubscribers subscribed to thisAsyncProcessorwhen theonError()was called, the global error handler is not invoked.
Example usage:
AsyncProcessor<Object> processor = AsyncProcessor.create();
TestSubscriber<Object> ts1 = processor.test();
ts1.assertEmpty();
processor.onNext(1);
// AsyncProcessor only emits when onComplete was called.
ts1.assertEmpty();
processor.onNext(2);
processor.onComplete();
// onComplete triggers the emission of the last cached item and the onComplete event.
ts1.assertResult(2);
TestSubscriber<Object> ts2 = processor.test();
// late Subscribers receive the last cached item too
ts2.assertResult(2);
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T> @NonNull AsyncProcessor<T> create()Creates a new AsyncProcessor.Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.getValue()Returns a single value this processor currently has or null if no such value exists.booleanReturns true if the FlowableProcessor has reached a terminal state through a complete event.booleanReturns true if the FlowableProcessor has subscribers.booleanReturns true if the FlowableProcessor has reached a terminal state through an error event.booleanhasValue()Returns true if this processor has any value.voidvoidvoidvoidImplementors of this method should make sure everything that needs to be visible inFlow.Subscriber.onNext(Object)is established before callingFlow.Subscription.request(long).protected voidsubscribeActual(@NonNull Flow.Subscriber<? super @NonNull T> s) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingFlow.Subscribers.Methods inherited from class FlowableProcessor
toSerializedMethods inherited from class Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWithMethods inherited from class Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface Flow.Publisher
subscribe
-
Method Details
-
create
Creates a new AsyncProcessor.- Type Parameters:
T- the value type to be received and emitted- Returns:
- the new AsyncProcessor instance
-
onSubscribe
Description copied from interface:FlowableSubscriberImplementors of this method should make sure everything that needs to be visible inFlow.Subscriber.onNext(Object)is established before callingFlow.Subscription.request(long). In practice this means no initialization should happen after therequest()call and additional behavior is thread safe in respect toonNext. -
onNext
-
onError
-
onComplete
public void onComplete() -
hasSubscribers
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has subscribers.The method is thread-safe.
- Specified by:
hasSubscribersin classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has subscribers
-
hasThrowable
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through an error event
- See Also:
-
hasComplete
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through a complete event
- See Also:
-
getThrowable
Description copied from class:FlowableProcessorReturns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classFlowableProcessor<T>- Returns:
- the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
-
subscribeActual
Description copied from class:FlowableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingFlow.Subscribers.There is no need to call any of the plugin hooks on the current
Flowableinstance or theSubscriber; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)before this method gets called.- Specified by:
subscribeActualin classFlowable<T>- Parameters:
s- the incomingSubscriber, nevernull
-
hasValue
Returns true if this processor has any value.The method is thread-safe.
- Returns:
- true if this processor has any value
-
getValue
Returns a single value this processor currently has or null if no such value exists.The method is thread-safe.
- Returns:
- a single value this processor currently has or null if no such value exists
-