T
- the value typepublic abstract class ParallelFlowable<T>
extends java.lang.Object
Use from()
to start processing a regular Publisher in 'rails'.
Use runOn()
to introduce where each 'rail' should run on thread-vise.
Use sequential()
to merge the sources back into a single Flowable.
Constructor and Description |
---|
ParallelFlowable() |
Modifier and Type | Method and Description |
---|---|
<C> ParallelFlowable<C> |
collect(java.util.concurrent.Callable<? extends C> collectionSupplier,
BiConsumer<? super C,? super T> collector)
Collect the elements in each rail into a collection supplied via a collectionSupplier
and collected into with a collector action, emitting the collection at the end.
|
<U> ParallelFlowable<U> |
compose(ParallelTransformer<T,U> composer)
Allows composing operators, in assembly time, on top of this ParallelFlowable
and returns another ParallelFlowable with composed features.
|
<R> ParallelFlowable<R> |
concatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and generating 2 publishers upfront.
|
<R> ParallelFlowable<R> |
concatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
int prefetch)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and using the given prefetch amount for generating Publishers upfront.
|
<R> ParallelFlowable<R> |
concatMapDelayError(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
boolean tillTheEnd)
Generates and concatenates Publishers on each 'rail', optionally delaying errors
and generating 2 publishers upfront.
|
<R> ParallelFlowable<R> |
concatMapDelayError(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
int prefetch,
boolean tillTheEnd)
Generates and concatenates Publishers on each 'rail', optionally delaying errors
and using the given prefetch amount for generating Publishers upfront.
|
ParallelFlowable<T> |
doAfterNext(Consumer<? super T> onAfterNext)
Call the specified consumer with the current element passing through any 'rail'
after it has been delivered to downstream within the rail.
|
ParallelFlowable<T> |
doAfterTerminated(Action onAfterTerminate)
Run the specified Action when a 'rail' completes or signals an error.
|
ParallelFlowable<T> |
doOnCancel(Action onCancel)
Run the specified Action when a 'rail' receives a cancellation.
|
ParallelFlowable<T> |
doOnComplete(Action onComplete)
Run the specified Action when a 'rail' completes.
|
ParallelFlowable<T> |
doOnError(Consumer<java.lang.Throwable> onError)
Call the specified consumer with the exception passing through any 'rail'.
|
ParallelFlowable<T> |
doOnNext(Consumer<? super T> onNext)
Call the specified consumer with the current element passing through any 'rail'.
|
ParallelFlowable<T> |
doOnNext(Consumer<? super T> onNext,
BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Call the specified consumer with the current element passing through any 'rail' and
handles errors based on the returned value by the handler function.
|
ParallelFlowable<T> |
doOnNext(Consumer<? super T> onNext,
ParallelFailureHandling errorHandler)
Call the specified consumer with the current element passing through any 'rail' and
handles errors based on the given
ParallelFailureHandling enumeration value. |
ParallelFlowable<T> |
doOnRequest(LongConsumer onRequest)
Call the specified consumer with the request amount if any rail receives a request.
|
ParallelFlowable<T> |
doOnSubscribe(Consumer<? super org.reactivestreams.Subscription> onSubscribe)
Call the specified callback when a 'rail' receives a Subscription from its upstream.
|
ParallelFlowable<T> |
filter(Predicate<? super T> predicate)
Filters the source values on each 'rail'.
|
ParallelFlowable<T> |
filter(Predicate<? super T> predicate,
BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Filters the source values on each 'rail' and
handles errors based on the returned value by the handler function.
|
ParallelFlowable<T> |
filter(Predicate<? super T> predicate,
ParallelFailureHandling errorHandler)
Filters the source values on each 'rail' and
handles errors based on the given
ParallelFailureHandling enumeration value. |
<R> ParallelFlowable<R> |
flatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Generates and flattens Publishers on each 'rail'.
|
<R> ParallelFlowable<R> |
flatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
boolean delayError)
Generates and flattens Publishers on each 'rail', optionally delaying errors.
|
<R> ParallelFlowable<R> |
flatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency)
Generates and flattens Publishers on each 'rail', optionally delaying errors
and having a total number of simultaneous subscriptions to the inner Publishers.
|
<R> ParallelFlowable<R> |
flatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency,
int prefetch)
Generates and flattens Publishers on each 'rail', optionally delaying errors,
having a total number of simultaneous subscriptions to the inner Publishers
and using the given prefetch amount for the inner Publishers.
|
static <T> ParallelFlowable<T> |
from(org.reactivestreams.Publisher<? extends T> source)
Take a Publisher and prepare to consume it on multiple 'rails' (number of CPUs)
in a round-robin fashion.
|
static <T> ParallelFlowable<T> |
from(org.reactivestreams.Publisher<? extends T> source,
int parallelism)
Take a Publisher and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.
|
static <T> ParallelFlowable<T> |
from(org.reactivestreams.Publisher<? extends T> source,
int parallelism,
int prefetch)
Take a Publisher and prepare to consume it on parallelism number of 'rails' ,
possibly ordered and round-robin fashion and use custom prefetch amount and queue
for dealing with the source Publisher's values.
|
static <T> ParallelFlowable<T> |
fromArray(org.reactivestreams.Publisher<T>... publishers)
Wraps multiple Publishers into a ParallelFlowable which runs them
in parallel and unordered.
|
<R> ParallelFlowable<R> |
map(Function<? super T,? extends R> mapper)
Maps the source values on each 'rail' to another value.
|
<R> ParallelFlowable<R> |
map(Function<? super T,? extends R> mapper,
BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Maps the source values on each 'rail' to another value and
handles errors based on the returned value by the handler function.
|
<R> ParallelFlowable<R> |
map(Function<? super T,? extends R> mapper,
ParallelFailureHandling errorHandler)
Maps the source values on each 'rail' to another value and
handles errors based on the given
ParallelFailureHandling enumeration value. |
abstract int |
parallelism()
Returns the number of expected parallel Subscribers.
|
Flowable<T> |
reduce(BiFunction<T,T,T> reducer)
Reduces all values within a 'rail' and across 'rails' with a reducer function into a single
sequential value.
|
<R> ParallelFlowable<R> |
reduce(java.util.concurrent.Callable<R> initialSupplier,
BiFunction<R,? super T,R> reducer)
Reduces all values within a 'rail' to a single value (with a possibly different type) via
a reducer function that is initialized on each rail from an initialSupplier value.
|
ParallelFlowable<T> |
runOn(Scheduler scheduler)
Specifies where each 'rail' will observe its incoming values with
no work-stealing and default prefetch amount.
|
ParallelFlowable<T> |
runOn(Scheduler scheduler,
int prefetch)
Specifies where each 'rail' will observe its incoming values with
possibly work-stealing and a given prefetch amount.
|
Flowable<T> |
sequential()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a default prefetch value
for the rails.
|
Flowable<T> |
sequential(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a give prefetch value
for the rails.
|
Flowable<T> |
sequentialDelayError()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Flowable sequence, running with a default prefetch value
for the rails and delaying errors from all rails till all terminate.
|
Flowable<T> |
sequentialDelayError(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a give prefetch value
for the rails and delaying errors from all rails till all terminate.
|
Flowable<T> |
sorted(java.util.Comparator<? super T> comparator)
Sorts the 'rails' of this ParallelFlowable and returns a Publisher that sequentially
picks the smallest next value from the rails.
|
Flowable<T> |
sorted(java.util.Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' of this ParallelFlowable and returns a Publisher that sequentially
picks the smallest next value from the rails.
|
abstract void |
subscribe(org.reactivestreams.Subscriber<? super T>[] subscribers)
Subscribes an array of Subscribers to this ParallelFlowable and triggers
the execution chain for all 'rails'.
|
<U> U |
to(Function<? super ParallelFlowable<T>,U> converter)
Perform a fluent transformation to a value via a converter function which
receives this ParallelFlowable.
|
Flowable<java.util.List<T>> |
toSortedList(java.util.Comparator<? super T> comparator)
Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.
|
Flowable<java.util.List<T>> |
toSortedList(java.util.Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.
|
protected boolean |
validate(org.reactivestreams.Subscriber<?>[] subscribers)
Validates the number of subscribers and returns true if their number
matches the parallelism level of this ParallelFlowable.
|
public abstract void subscribe(@NonNull org.reactivestreams.Subscriber<? super T>[] subscribers)
subscribers
- the subscribers array to run in parallel, the number
of items must be equal to the parallelism level of this ParallelFlowableparallelism()
public abstract int parallelism()
protected final boolean validate(@NonNull org.reactivestreams.Subscriber<?>[] subscribers)
subscribers
- the array of Subscribers@CheckReturnValue public static <T> ParallelFlowable<T> from(@NonNull org.reactivestreams.Publisher<? extends T> source)
T
- the value typesource
- the source Publisher@CheckReturnValue public static <T> ParallelFlowable<T> from(@NonNull org.reactivestreams.Publisher<? extends T> source, int parallelism)
T
- the value typesource
- the source Publisherparallelism
- the number of parallel rails@CheckReturnValue public static <T> ParallelFlowable<T> from(@NonNull org.reactivestreams.Publisher<? extends T> source, int parallelism, int prefetch)
T
- the value typesource
- the source Publisherparallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the source
the source until there is a rail ready to process it.@CheckReturnValue public final <R> ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper)
Note that the same mapper function may be called from multiple threads concurrently.
R
- the output value typemapper
- the mapper function turning Ts into Us.@CheckReturnValue public final <R> ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
Note that the same mapper function may be called from multiple threads concurrently.
R
- the output value typemapper
- the mapper function turning Ts into Us.errorHandler
- the enumeration that defines how to handle errors thrown
from the mapper function@CheckReturnValue public final <R> ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper, @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Note that the same mapper function may be called from multiple threads concurrently.
R
- the output value typemapper
- the mapper function turning Ts into Us.errorHandler
- the function called with the current repeat count and
failure Throwable and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.@CheckReturnValue public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate)
Note that the same predicate may be called from multiple threads concurrently.
predicate
- the function returning true to keep a value or false to drop a value@CheckReturnValue public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
Note that the same predicate may be called from multiple threads concurrently.
predicate
- the function returning true to keep a value or false to drop a valueerrorHandler
- the enumeration that defines how to handle errors thrown
from the predicate@CheckReturnValue public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Note that the same predicate may be called from multiple threads concurrently.
predicate
- the function returning true to keep a value or false to drop a valueerrorHandler
- the function called with the current repeat count and
failure Throwable and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.@CheckReturnValue public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler)
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
The operator will call Scheduler.createWorker()
as many
times as this ParallelFlowable's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlowable's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to use@CheckReturnValue public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch)
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
The operator will call Scheduler.createWorker()
as many
times as this ParallelFlowable's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlowable's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to use
that rail's worker has run out of work.prefetch
- the number of values to request on each 'rail' from the source@CheckReturnValue public final Flowable<T> reduce(@NonNull BiFunction<T,T,T> reducer)
Note that the same reducer function may be called from multiple threads concurrently.
reducer
- the function to reduce two values into one.@CheckReturnValue public final <R> ParallelFlowable<R> reduce(@NonNull java.util.concurrent.Callable<R> initialSupplier, @NonNull BiFunction<R,? super T,R> reducer)
Note that the same mapper function may be called from multiple threads concurrently.
R
- the reduced output typeinitialSupplier
- the supplier for the initial valuereducer
- the function to reduce a previous output of reduce (or the initial value supplied)
with a current source value.@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue public final Flowable<T> sequential()
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
sequential
does not operate by default on a particular Scheduler
.sequential(int)
,
sequentialDelayError()
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue public final Flowable<T> sequential(int prefetch)
sequential
does not operate by default on a particular Scheduler
.prefetch
- the prefetch amount to use for each railsequential()
,
sequentialDelayError(int)
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue public final Flowable<T> sequentialDelayError()
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
sequentialDelayError
does not operate by default on a particular Scheduler
.sequentialDelayError(int)
,
sequential()
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue public final Flowable<T> sequentialDelayError(int prefetch)
sequentialDelayError
does not operate by default on a particular Scheduler
.prefetch
- the prefetch amount to use for each railsequential()
,
sequentialDelayError()
@CheckReturnValue public final Flowable<T> sorted(@NonNull java.util.Comparator<? super T> comparator)
This operator requires a finite source ParallelFlowable.
comparator
- the comparator to use@CheckReturnValue public final Flowable<T> sorted(@NonNull java.util.Comparator<? super T> comparator, int capacityHint)
This operator requires a finite source ParallelFlowable.
comparator
- the comparator to usecapacityHint
- the expected number of total elements@CheckReturnValue public final Flowable<java.util.List<T>> toSortedList(@NonNull java.util.Comparator<? super T> comparator)
This operator requires a finite source ParallelFlowable.
comparator
- the comparator to compare elements@CheckReturnValue public final Flowable<java.util.List<T>> toSortedList(@NonNull java.util.Comparator<? super T> comparator, int capacityHint)
This operator requires a finite source ParallelFlowable.
comparator
- the comparator to compare elementscapacityHint
- the expected number of total elements@CheckReturnValue public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext)
onNext
- the callback@CheckReturnValue public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.onNext
- the callbackerrorHandler
- the enumeration that defines how to handle errors thrown
from the onNext consumer@CheckReturnValue public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext, @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
onNext
- the callbackerrorHandler
- the function called with the current repeat count and
failure Throwable and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.@CheckReturnValue public final ParallelFlowable<T> doAfterNext(@NonNull Consumer<? super T> onAfterNext)
onAfterNext
- the callback@CheckReturnValue public final ParallelFlowable<T> doOnError(@NonNull Consumer<java.lang.Throwable> onError)
onError
- the callback@CheckReturnValue public final ParallelFlowable<T> doOnComplete(@NonNull Action onComplete)
onComplete
- the callback@CheckReturnValue public final ParallelFlowable<T> doAfterTerminated(@NonNull Action onAfterTerminate)
onAfterTerminate
- the callback@CheckReturnValue public final ParallelFlowable<T> doOnSubscribe(@NonNull Consumer<? super org.reactivestreams.Subscription> onSubscribe)
onSubscribe
- the callback@CheckReturnValue public final ParallelFlowable<T> doOnRequest(@NonNull LongConsumer onRequest)
onRequest
- the callback@CheckReturnValue public final ParallelFlowable<T> doOnCancel(@NonNull Action onCancel)
onCancel
- the callback@CheckReturnValue public final <C> ParallelFlowable<C> collect(@NonNull java.util.concurrent.Callable<? extends C> collectionSupplier, @NonNull BiConsumer<? super C,? super T> collector)
C
- the collection typecollectionSupplier
- the supplier of the collection in each railcollector
- the collector, taking the per-rail collection and the current item@CheckReturnValue public static <T> ParallelFlowable<T> fromArray(@NonNull org.reactivestreams.Publisher<T>... publishers)
T
- the value typepublishers
- the array of publishers@CheckReturnValue public final <U> U to(@NonNull Function<? super ParallelFlowable<T>,U> converter)
U
- the output value typeconverter
- the converter function from ParallelFlowable to some type@CheckReturnValue public final <U> ParallelFlowable<U> compose(@NonNull ParallelTransformer<T,U> composer)
U
- the output value typecomposer
- the composer function from ParallelFlowable (this) to another ParallelFlowable@CheckReturnValue public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Errors are not delayed and uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a Publisher@CheckReturnValue public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, boolean delayError)
It uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed till everybody terminates?@CheckReturnValue public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency)
It uses a default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the generated inner Publishers@CheckReturnValue public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the generated inner Publishersprefetch
- the number of items to prefetch from each inner Publisher@CheckReturnValue public final <R> ParallelFlowable<R> concatMap(@NonNull Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
R
- the result typemapper
- the function to map each rail's value into a Publisher
source and the inner Publishers (immediate, boundary, end)@CheckReturnValue public final <R> ParallelFlowable<R> concatMap(@NonNull Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a Publisherprefetch
- the number of items to prefetch from each inner Publisher
source and the inner Publishers (immediate, boundary, end)@CheckReturnValue public final <R> ParallelFlowable<R> concatMapDelayError(@NonNull Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, boolean tillTheEnd)
R
- the result typemapper
- the function to map each rail's value into a PublishertillTheEnd
- if true all errors from the upstream and inner Publishers are delayed
till all of them terminate, if false, the error is emitted when an inner Publisher terminates.
source and the inner Publishers (immediate, boundary, end)@CheckReturnValue public final <R> ParallelFlowable<R> concatMapDelayError(@NonNull Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, int prefetch, boolean tillTheEnd)
R
- the result typemapper
- the function to map each rail's value into a Publisherprefetch
- the number of items to prefetch from each inner PublishertillTheEnd
- if true all errors from the upstream and inner Publishers are delayed
till all of them terminate, if false, the error is emitted when an inner Publisher terminates.