T
- the value typepublic abstract class ParallelFlowable<T> extends Object
Use from()
to start processing a regular Publisher in 'rails'.
Use runOn()
to introduce where each 'rail' should run on thread-vise.
Use sequential()
to merge the sources back into a single Flowable.
History: 2.0.5 - experimental; 2.1 - beta
Constructor and Description |
---|
ParallelFlowable() |
Modifier and Type | Method and Description |
---|---|
<C> ParallelFlowable<C> |
collect(Supplier<? extends C> collectionSupplier,
BiConsumer<? super C,? super T> collector)
Collect the elements in each rail into a collection supplied via a collectionSupplier
and collected into with a collector action, emitting the collection at the end.
|
<U> ParallelFlowable<U> |
compose(ParallelTransformer<T,U> composer)
Allows composing operators, in assembly time, on top of this ParallelFlowable
and returns another ParallelFlowable with composed features.
|
<R> ParallelFlowable<R> |
concatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and generating 2 publishers upfront.
|
<R> ParallelFlowable<R> |
concatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
int prefetch)
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and using the given prefetch amount for generating Publishers upfront.
|
<R> ParallelFlowable<R> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean tillTheEnd)
Generates and concatenates Publishers on each 'rail', optionally delaying errors
and generating 2 publishers upfront.
|
<R> ParallelFlowable<R> |
concatMapDelayError(Function<? super T,? extends Publisher<? extends R>> mapper,
int prefetch,
boolean tillTheEnd)
Generates and concatenates Publishers on each 'rail', optionally delaying errors
and using the given prefetch amount for generating Publishers upfront.
|
ParallelFlowable<T> |
doAfterNext(Consumer<? super T> onAfterNext)
Call the specified consumer with the current element passing through any 'rail'
after it has been delivered to downstream within the rail.
|
ParallelFlowable<T> |
doAfterTerminated(Action onAfterTerminate)
Run the specified Action when a 'rail' completes or signals an error.
|
ParallelFlowable<T> |
doOnCancel(Action onCancel)
Run the specified Action when a 'rail' receives a cancellation.
|
ParallelFlowable<T> |
doOnComplete(Action onComplete)
Run the specified Action when a 'rail' completes.
|
ParallelFlowable<T> |
doOnError(Consumer<Throwable> onError)
Call the specified consumer with the exception passing through any 'rail'.
|
ParallelFlowable<T> |
doOnNext(Consumer<? super T> onNext)
Call the specified consumer with the current element passing through any 'rail'.
|
ParallelFlowable<T> |
doOnNext(Consumer<? super T> onNext,
BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Call the specified consumer with the current element passing through any 'rail' and
handles errors based on the returned value by the handler function.
|
ParallelFlowable<T> |
doOnNext(Consumer<? super T> onNext,
ParallelFailureHandling errorHandler)
Call the specified consumer with the current element passing through any 'rail' and
handles errors based on the given
ParallelFailureHandling enumeration value. |
ParallelFlowable<T> |
doOnRequest(LongConsumer onRequest)
Call the specified consumer with the request amount if any rail receives a request.
|
ParallelFlowable<T> |
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
Call the specified callback when a 'rail' receives a Subscription from its upstream.
|
ParallelFlowable<T> |
filter(Predicate<? super T> predicate)
Filters the source values on each 'rail'.
|
ParallelFlowable<T> |
filter(Predicate<? super T> predicate,
BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Filters the source values on each 'rail' and
handles errors based on the returned value by the handler function.
|
ParallelFlowable<T> |
filter(Predicate<? super T> predicate,
ParallelFailureHandling errorHandler)
Filters the source values on each 'rail' and
handles errors based on the given
ParallelFailureHandling enumeration value. |
<R> ParallelFlowable<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and flattens Publishers on each 'rail'.
|
<R> ParallelFlowable<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError)
Generates and flattens Publishers on each 'rail', optionally delaying errors.
|
<R> ParallelFlowable<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency)
Generates and flattens Publishers on each 'rail', optionally delaying errors
and having a total number of simultaneous subscriptions to the inner Publishers.
|
<R> ParallelFlowable<R> |
flatMap(Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency,
int prefetch)
Generates and flattens Publishers on each 'rail', optionally delaying errors,
having a total number of simultaneous subscriptions to the inner Publishers
and using the given prefetch amount for the inner Publishers.
|
static <T> ParallelFlowable<T> |
from(Publisher<? extends T> source)
Take a Publisher and prepare to consume it on multiple 'rails' (number of CPUs)
in a round-robin fashion.
|
static <T> ParallelFlowable<T> |
from(Publisher<? extends T> source,
int parallelism)
Take a Publisher and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.
|
static <T> ParallelFlowable<T> |
from(Publisher<? extends T> source,
int parallelism,
int prefetch)
Take a Publisher and prepare to consume it on parallelism number of 'rails' ,
possibly ordered and round-robin fashion and use custom prefetch amount and queue
for dealing with the source Publisher's values.
|
static <T> ParallelFlowable<T> |
fromArray(Publisher<T>... publishers)
Wraps multiple Publishers into a ParallelFlowable which runs them
in parallel and unordered.
|
<R> ParallelFlowable<R> |
map(Function<? super T,? extends R> mapper)
Maps the source values on each 'rail' to another value.
|
<R> ParallelFlowable<R> |
map(Function<? super T,? extends R> mapper,
BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Maps the source values on each 'rail' to another value and
handles errors based on the returned value by the handler function.
|
<R> ParallelFlowable<R> |
map(Function<? super T,? extends R> mapper,
ParallelFailureHandling errorHandler)
Maps the source values on each 'rail' to another value and
handles errors based on the given
ParallelFailureHandling enumeration value. |
abstract int |
parallelism()
Returns the number of expected parallel Subscribers.
|
Flowable<T> |
reduce(BiFunction<T,T,T> reducer)
Reduces all values within a 'rail' and across 'rails' with a reducer function into a single
sequential value.
|
<R> ParallelFlowable<R> |
reduce(Supplier<R> initialSupplier,
BiFunction<R,? super T,R> reducer)
Reduces all values within a 'rail' to a single value (with a possibly different type) via
a reducer function that is initialized on each rail from an initialSupplier value.
|
ParallelFlowable<T> |
runOn(Scheduler scheduler)
Specifies where each 'rail' will observe its incoming values with
no work-stealing and default prefetch amount.
|
ParallelFlowable<T> |
runOn(Scheduler scheduler,
int prefetch)
Specifies where each 'rail' will observe its incoming values with
possibly work-stealing and a given prefetch amount.
|
Flowable<T> |
sequential()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a default prefetch value
for the rails.
|
Flowable<T> |
sequential(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a give prefetch value
for the rails.
|
Flowable<T> |
sequentialDelayError()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Flowable sequence, running with a default prefetch value
for the rails and delaying errors from all rails till all terminate.
|
Flowable<T> |
sequentialDelayError(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a give prefetch value
for the rails and delaying errors from all rails till all terminate.
|
Flowable<T> |
sorted(Comparator<? super T> comparator)
Sorts the 'rails' of this ParallelFlowable and returns a Publisher that sequentially
picks the smallest next value from the rails.
|
Flowable<T> |
sorted(Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' of this ParallelFlowable and returns a Publisher that sequentially
picks the smallest next value from the rails.
|
abstract void |
subscribe(Subscriber<? super T>[] subscribers)
Subscribes an array of Subscribers to this ParallelFlowable and triggers
the execution chain for all 'rails'.
|
<R> R |
to(ParallelFlowableConverter<T,R> converter)
Calls the specified converter function during assembly time and returns its resulting value.
|
Flowable<List<T>> |
toSortedList(Comparator<? super T> comparator)
Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.
|
Flowable<List<T>> |
toSortedList(Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.
|
protected boolean |
validate(Subscriber<?>[] subscribers)
Validates the number of subscribers and returns true if their number
matches the parallelism level of this ParallelFlowable.
|
public abstract void subscribe(@NonNull Subscriber<? super T>[] subscribers)
subscribers
- the subscribers array to run in parallel, the number
of items must be equal to the parallelism level of this ParallelFlowableparallelism()
public abstract int parallelism()
protected final boolean validate(@NonNull Subscriber<?>[] subscribers)
subscribers
- the array of Subscribers@CheckReturnValue public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source)
T
- the value typesource
- the source Publisher@CheckReturnValue public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source, int parallelism)
T
- the value typesource
- the source Publisherparallelism
- the number of parallel rails@CheckReturnValue @NonNull public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source, int parallelism, int prefetch)
T
- the value typesource
- the source Publisherparallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the source
the source until there is a rail ready to process it.@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper)
Note that the same mapper function may be called from multiple threads concurrently.
R
- the output value typemapper
- the mapper function turning Ts into Us.@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
Note that the same mapper function may be called from multiple threads concurrently.
History: 2.0.8 - experimental
R
- the output value typemapper
- the mapper function turning Ts into Us.errorHandler
- the enumeration that defines how to handle errors thrown
from the mapper function@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper, @NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Note that the same mapper function may be called from multiple threads concurrently.
History: 2.0.8 - experimental
R
- the output value typemapper
- the mapper function turning Ts into Us.errorHandler
- the function called with the current repeat count and
failure Throwable and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.@CheckReturnValue public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate)
Note that the same predicate may be called from multiple threads concurrently.
predicate
- the function returning true to keep a value or false to drop a value@CheckReturnValue public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
Note that the same predicate may be called from multiple threads concurrently.
History: 2.0.8 - experimental
predicate
- the function returning true to keep a value or false to drop a valueerrorHandler
- the enumeration that defines how to handle errors thrown
from the predicate@CheckReturnValue public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Note that the same predicate may be called from multiple threads concurrently.
History: 2.0.8 - experimental
predicate
- the function returning true to keep a value or false to drop a valueerrorHandler
- the function called with the current repeat count and
failure Throwable and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.@CheckReturnValue @NonNull public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler)
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
The operator will call Scheduler.createWorker()
as many
times as this ParallelFlowable's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlowable's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to use@CheckReturnValue @NonNull public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch)
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
The operator will call Scheduler.createWorker()
as many
times as this ParallelFlowable's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's parallelism level is lower than the ParallelFlowable's, some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own built-in trampolining logic.
scheduler
- the scheduler to use
that rail's worker has run out of work.prefetch
- the number of values to request on each 'rail' from the source@CheckReturnValue @NonNull public final Flowable<T> reduce(@NonNull BiFunction<T,T,T> reducer)
Note that the same reducer function may be called from multiple threads concurrently.
reducer
- the function to reduce two values into one.@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> reduce(@NonNull Supplier<R> initialSupplier, @NonNull BiFunction<R,? super T,R> reducer)
Note that the same mapper function may be called from multiple threads concurrently.
R
- the reduced output typeinitialSupplier
- the supplier for the initial valuereducer
- the function to reduce a previous output of reduce (or the initial value supplied)
with a current source value.@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue public final Flowable<T> sequential()
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
sequential
does not operate by default on a particular Scheduler
.sequential(int)
,
sequentialDelayError()
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue @NonNull public final Flowable<T> sequential(int prefetch)
sequential
does not operate by default on a particular Scheduler
.prefetch
- the prefetch amount to use for each railsequential()
,
sequentialDelayError(int)
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue @NonNull public final Flowable<T> sequentialDelayError()
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
sequentialDelayError
does not operate by default on a particular Scheduler
.History: 2.0.7 - experimental
sequentialDelayError(int)
,
sequential()
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue @NonNull public final Flowable<T> sequentialDelayError(int prefetch)
sequentialDelayError
does not operate by default on a particular Scheduler
.History: 2.0.7 - experimental
prefetch
- the prefetch amount to use for each railsequential()
,
sequentialDelayError()
@CheckReturnValue @NonNull public final Flowable<T> sorted(@NonNull Comparator<? super T> comparator)
This operator requires a finite source ParallelFlowable.
comparator
- the comparator to use@CheckReturnValue @NonNull public final Flowable<T> sorted(@NonNull Comparator<? super T> comparator, int capacityHint)
This operator requires a finite source ParallelFlowable.
comparator
- the comparator to usecapacityHint
- the expected number of total elements@CheckReturnValue @NonNull public final Flowable<List<T>> toSortedList(@NonNull Comparator<? super T> comparator)
This operator requires a finite source ParallelFlowable.
comparator
- the comparator to compare elements@CheckReturnValue @NonNull public final Flowable<List<T>> toSortedList(@NonNull Comparator<? super T> comparator, int capacityHint)
This operator requires a finite source ParallelFlowable.
comparator
- the comparator to compare elementscapacityHint
- the expected number of total elements@CheckReturnValue @NonNull public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext)
onNext
- the callback@CheckReturnValue @NonNull public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
History: 2.0.8 - experimental
onNext
- the callbackerrorHandler
- the enumeration that defines how to handle errors thrown
from the onNext consumer@CheckReturnValue @NonNull public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext, @NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
History: 2.0.8 - experimental
onNext
- the callbackerrorHandler
- the function called with the current repeat count and
failure Throwable and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.@CheckReturnValue @NonNull public final ParallelFlowable<T> doAfterNext(@NonNull Consumer<? super T> onAfterNext)
onAfterNext
- the callback@CheckReturnValue @NonNull public final ParallelFlowable<T> doOnError(@NonNull Consumer<Throwable> onError)
onError
- the callback@CheckReturnValue @NonNull public final ParallelFlowable<T> doOnComplete(@NonNull Action onComplete)
onComplete
- the callback@CheckReturnValue @NonNull public final ParallelFlowable<T> doAfterTerminated(@NonNull Action onAfterTerminate)
onAfterTerminate
- the callback@CheckReturnValue @NonNull public final ParallelFlowable<T> doOnSubscribe(@NonNull Consumer<? super Subscription> onSubscribe)
onSubscribe
- the callback@CheckReturnValue @NonNull public final ParallelFlowable<T> doOnRequest(@NonNull LongConsumer onRequest)
onRequest
- the callback@CheckReturnValue @NonNull public final ParallelFlowable<T> doOnCancel(@NonNull Action onCancel)
onCancel
- the callback@CheckReturnValue @NonNull public final <C> ParallelFlowable<C> collect(@NonNull Supplier<? extends C> collectionSupplier, @NonNull BiConsumer<? super C,? super T> collector)
C
- the collection typecollectionSupplier
- the supplier of the collection in each railcollector
- the collector, taking the per-rail collection and the current item@CheckReturnValue @NonNull public static <T> ParallelFlowable<T> fromArray(@NonNull Publisher<T>... publishers)
T
- the value typepublishers
- the array of publishers@CheckReturnValue @NonNull public final <R> R to(@NonNull ParallelFlowableConverter<T,R> converter)
This allows fluent conversion to any other type.
History: 2.1.7 - experimental
R
- the resulting object typeconverter
- the function that receives the current ParallelFlowable instance and returns a valueNullPointerException
- if converter is null@CheckReturnValue @NonNull public final <U> ParallelFlowable<U> compose(@NonNull ParallelTransformer<T,U> composer)
U
- the output value typecomposer
- the composer function from ParallelFlowable (this) to another ParallelFlowable@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper)
Errors are not delayed and uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a Publisher@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError)
It uses unbounded concurrency along with default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed till everybody terminates?@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency)
It uses a default inner prefetch.
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the generated inner Publishers@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a PublisherdelayError
- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the generated inner Publishersprefetch
- the number of items to prefetch from each inner Publisher@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> concatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper)
R
- the result typemapper
- the function to map each rail's value into a Publisher
source and the inner Publishers (immediate, boundary, end)@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> concatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, int prefetch)
R
- the result typemapper
- the function to map each rail's value into a Publisherprefetch
- the number of items to prefetch from each inner Publisher
source and the inner Publishers (immediate, boundary, end)@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> concatMapDelayError(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, boolean tillTheEnd)
R
- the result typemapper
- the function to map each rail's value into a PublishertillTheEnd
- if true all errors from the upstream and inner Publishers are delayed
till all of them terminate, if false, the error is emitted when an inner Publisher terminates.
source and the inner Publishers (immediate, boundary, end)@CheckReturnValue @NonNull public final <R> ParallelFlowable<R> concatMapDelayError(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, int prefetch, boolean tillTheEnd)
R
- the result typemapper
- the function to map each rail's value into a Publisherprefetch
- the number of items to prefetch from each inner PublishertillTheEnd
- if true all errors from the upstream and inner Publishers are delayed
till all of them terminate, if false, the error is emitted when an inner Publisher terminates.