T
- the value typepublic abstract class ParallelFlowable<T> extends Object
Subscriber
s.
Use from(Publisher)
to start processing a regular Publisher
in 'rails'.
Use runOn(Scheduler)
to introduce where each 'rail' should run on thread-vise.
Use sequential()
to merge the sources back into a single Flowable
.
History: 2.0.5 - experimental; 2.1 - beta
Constructor and Description |
---|
ParallelFlowable() |
Modifier and Type | Method and Description |
---|---|
<A,R> @NonNull Flowable<R> |
collect(@NonNull Collector<T,A,R> collector)
|
<C> @NonNull ParallelFlowable<C> |
collect(@NonNull Supplier<? extends C> collectionSupplier,
@NonNull BiConsumer<? super C,? super T> collector)
Collect the elements in each rail into a collection supplied via a
collectionSupplier
and collected into with a collector action, emitting the collection at the end. |
<U> @NonNull ParallelFlowable<U> |
compose(@NonNull ParallelTransformer<T,U> composer)
Allows composing operators, in assembly time, on top of this
ParallelFlowable
and returns another ParallelFlowable with composed features. |
<R> @NonNull ParallelFlowable<R> |
concatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and concatenates
Publisher s on each 'rail', signalling errors immediately
and generating 2 publishers upfront. |
<R> @NonNull ParallelFlowable<R> |
concatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper,
int prefetch)
Generates and concatenates
Publisher s on each 'rail', signalling errors immediately
and using the given prefetch amount for generating Publisher s upfront. |
<R> @NonNull ParallelFlowable<R> |
concatMapDelayError(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper,
boolean tillTheEnd)
Generates and concatenates
Publisher s on each 'rail', optionally delaying errors
and generating 2 publishers upfront. |
<R> @NonNull ParallelFlowable<R> |
concatMapDelayError(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper,
int prefetch,
boolean tillTheEnd)
Generates and concatenates
Publisher s on each 'rail', optionally delaying errors
and using the given prefetch amount for generating Publisher s upfront. |
@NonNull ParallelFlowable<T> |
doAfterNext(@NonNull Consumer<? super T> onAfterNext)
Call the specified consumer with the current element passing through any 'rail'
after it has been delivered to downstream within the rail.
|
@NonNull ParallelFlowable<T> |
doAfterTerminated(@NonNull Action onAfterTerminate)
Run the specified
Action when a 'rail' completes or signals an error. |
@NonNull ParallelFlowable<T> |
doOnCancel(@NonNull Action onCancel)
Run the specified
Action when a 'rail' receives a cancellation. |
@NonNull ParallelFlowable<T> |
doOnComplete(@NonNull Action onComplete)
Run the specified
Action when a 'rail' completes. |
@NonNull ParallelFlowable<T> |
doOnError(@NonNull Consumer<? super Throwable> onError)
Call the specified consumer with the exception passing through any 'rail'.
|
@NonNull ParallelFlowable<T> |
doOnNext(@NonNull Consumer<? super T> onNext)
Call the specified consumer with the current element passing through any 'rail'.
|
@NonNull ParallelFlowable<T> |
doOnNext(@NonNull Consumer<? super T> onNext,
@NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Call the specified consumer with the current element passing through any 'rail' and
handles errors based on the returned value by the handler function.
|
@NonNull ParallelFlowable<T> |
doOnNext(@NonNull Consumer<? super T> onNext,
@NonNull ParallelFailureHandling errorHandler)
Call the specified consumer with the current element passing through any 'rail' and
handles errors based on the given
ParallelFailureHandling enumeration value. |
@NonNull ParallelFlowable<T> |
doOnRequest(@NonNull LongConsumer onRequest)
Call the specified consumer with the request amount if any rail receives a request.
|
@NonNull ParallelFlowable<T> |
doOnSubscribe(@NonNull Consumer<? super Subscription> onSubscribe)
Call the specified callback when a 'rail' receives a
Subscription from its upstream. |
@NonNull ParallelFlowable<T> |
filter(@NonNull Predicate<? super T> predicate)
Filters the source values on each 'rail'.
|
@NonNull ParallelFlowable<T> |
filter(@NonNull Predicate<? super T> predicate,
@NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Filters the source values on each 'rail' and
handles errors based on the returned value by the handler function.
|
@NonNull ParallelFlowable<T> |
filter(@NonNull Predicate<? super T> predicate,
@NonNull ParallelFailureHandling errorHandler)
Filters the source values on each 'rail' and
handles errors based on the given
ParallelFailureHandling enumeration value. |
<R> @NonNull ParallelFlowable<R> |
flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper)
Generates and flattens
Publisher s on each 'rail'. |
<R> @NonNull ParallelFlowable<R> |
flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError)
Generates and flattens
Publisher s on each 'rail', optionally delaying errors. |
<R> @NonNull ParallelFlowable<R> |
flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency)
Generates and flattens
Publisher s on each 'rail', optionally delaying errors
and having a total number of simultaneous subscriptions to the inner Publisher s. |
<R> @NonNull ParallelFlowable<R> |
flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency,
int prefetch)
Generates and flattens
Publisher s on each 'rail', optionally delaying errors,
having a total number of simultaneous subscriptions to the inner Publisher s
and using the given prefetch amount for the inner Publisher s. |
<U> @NonNull ParallelFlowable<U> |
flatMapIterable(@NonNull Function<? super T,? extends Iterable<? extends U>> mapper)
Returns a
ParallelFlowable that merges each item emitted by the source on each rail with the values in an
Iterable corresponding to that item that is generated by a selector. |
<U> @NonNull ParallelFlowable<U> |
flatMapIterable(@NonNull Function<? super T,? extends Iterable<? extends U>> mapper,
int bufferSize)
Returns a
ParallelFlowable that merges each item emitted by the source ParallelFlowable with the values in an
Iterable corresponding to that item that is generated by a selector. |
<R> @NonNull ParallelFlowable<R> |
flatMapStream(@NonNull Function<? super T,? extends Stream<? extends R>> mapper)
Maps each upstream item on each rail into a
Stream and emits the Stream 's items to the downstream in a sequential fashion. |
<R> @NonNull ParallelFlowable<R> |
flatMapStream(@NonNull Function<? super T,? extends Stream<? extends R>> mapper,
int prefetch)
Maps each upstream item of each rail into a
Stream and emits the Stream 's items to the downstream in a sequential fashion. |
static <T> @NonNull ParallelFlowable<T> |
from(@NonNull Publisher<? extends T> source)
Take a
Publisher and prepare to consume it on multiple 'rails' (number of CPUs)
in a round-robin fashion. |
static <T> @NonNull ParallelFlowable<T> |
from(@NonNull Publisher<? extends T> source,
int parallelism)
Take a
Publisher and prepare to consume it on parallelism number of 'rails' in a round-robin fashion. |
static <T> @NonNull ParallelFlowable<T> |
from(@NonNull Publisher<? extends T> source,
int parallelism,
int prefetch)
Take a
Publisher and prepare to consume it on parallelism number of 'rails' ,
possibly ordered and round-robin fashion and use custom prefetch amount and queue
for dealing with the source Publisher 's values. |
static <T> @NonNull ParallelFlowable<T> |
fromArray(Publisher<T>... publishers)
Wraps multiple
Publisher s into a ParallelFlowable which runs them
in parallel and unordered. |
<R> @NonNull ParallelFlowable<R> |
map(@NonNull Function<? super T,? extends R> mapper)
Maps the source values on each 'rail' to another value.
|
<R> @NonNull ParallelFlowable<R> |
map(@NonNull Function<? super T,? extends R> mapper,
@NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Maps the source values on each 'rail' to another value and
handles errors based on the returned value by the handler function.
|
<R> @NonNull ParallelFlowable<R> |
map(@NonNull Function<? super T,? extends R> mapper,
@NonNull ParallelFailureHandling errorHandler)
Maps the source values on each 'rail' to another value and
handles errors based on the given
ParallelFailureHandling enumeration value. |
<R> @NonNull ParallelFlowable<R> |
mapOptional(@NonNull Function<? super T,Optional<? extends R>> mapper)
Maps the source values on each 'rail' to an optional and emits its value if any.
|
<R> @NonNull ParallelFlowable<R> |
mapOptional(@NonNull Function<? super T,Optional<? extends R>> mapper,
@NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Maps the source values on each 'rail' to an optional and emits its value if any and
handles errors based on the returned value by the handler function.
|
<R> @NonNull ParallelFlowable<R> |
mapOptional(@NonNull Function<? super T,Optional<? extends R>> mapper,
@NonNull ParallelFailureHandling errorHandler)
Maps the source values on each 'rail' to an optional and emits its value if any and
handles errors based on the given
ParallelFailureHandling enumeration value. |
abstract int |
parallelism()
Returns the number of expected parallel
Subscriber s. |
@NonNull Flowable<T> |
reduce(@NonNull BiFunction<T,T,T> reducer)
Reduces all values within a 'rail' and across 'rails' with a reducer function into one
Flowable sequence. |
<R> @NonNull ParallelFlowable<R> |
reduce(@NonNull Supplier<R> initialSupplier,
@NonNull BiFunction<R,? super T,R> reducer)
Reduces all values within a 'rail' to a single value (with a possibly different type) via
a reducer function that is initialized on each rail from an
initialSupplier value. |
@NonNull ParallelFlowable<T> |
runOn(@NonNull Scheduler scheduler)
Specifies where each 'rail' will observe its incoming values, specified via a
Scheduler , with
no work-stealing and default prefetch amount. |
@NonNull ParallelFlowable<T> |
runOn(@NonNull Scheduler scheduler,
int prefetch)
Specifies where each 'rail' will observe its incoming values, specified via a
Scheduler , with
possibly work-stealing and a given prefetch amount. |
@NonNull Flowable<T> |
sequential()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular
Flowable sequence, running with a default prefetch value
for the rails. |
@NonNull Flowable<T> |
sequential(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular
Flowable sequence, running with a give prefetch value
for the rails. |
@NonNull Flowable<T> |
sequentialDelayError()
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular
Flowable sequence, running with a default prefetch value
for the rails and delaying errors from all rails till all terminate. |
@NonNull Flowable<T> |
sequentialDelayError(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular
Flowable sequence, running with a give prefetch value
for the rails and delaying errors from all rails till all terminate. |
@NonNull Flowable<T> |
sorted(@NonNull Comparator<? super T> comparator)
Sorts the 'rails' of this
ParallelFlowable and returns a Flowable that sequentially
picks the smallest next value from the rails. |
@NonNull Flowable<T> |
sorted(@NonNull Comparator<? super T> comparator,
int capacityHint)
Sorts the 'rails' of this
ParallelFlowable and returns a Flowable that sequentially
picks the smallest next value from the rails. |
abstract void |
subscribe(@NonNull Subscriber<? super T>[] subscribers)
Subscribes an array of
Subscriber s to this ParallelFlowable and triggers
the execution chain for all 'rails'. |
<R> R |
to(@NonNull ParallelFlowableConverter<T,R> converter)
Calls the specified converter function during assembly time and returns its resulting value.
|
@NonNull Flowable<List<T>> |
toSortedList(@NonNull Comparator<? super T> comparator)
|
@NonNull Flowable<List<T>> |
toSortedList(@NonNull Comparator<? super T> comparator,
int capacityHint)
|
protected boolean |
validate(@NonNull Subscriber<?>[] subscribers)
Validates the number of subscribers and returns
true if their number
matches the parallelism level of this ParallelFlowable . |
@BackpressureSupport(value=SPECIAL) @SchedulerSupport(value="none") public abstract void subscribe(@NonNull Subscriber<? super T>[] subscribers)
Subscriber
s to this ParallelFlowable
and triggers
the execution chain for all 'rails'.
Subscriber
.subscribe
does not operate by default on a particular Scheduler
.subscribers
- the subscribers array to run in parallel, the number
of items must be equal to the parallelism level of this ParallelFlowable
NullPointerException
- if subscribers
is null
parallelism()
@CheckReturnValue public abstract int parallelism()
Subscriber
s.Subscriber
sprotected final boolean validate(@NonNull Subscriber<?>[] subscribers)
true
if their number
matches the parallelism level of this ParallelFlowable
.subscribers
- the array of Subscriber
strue
if the number of subscribers equals to the parallelism levelNullPointerException
- if subscribers
is null
IllegalArgumentException
- if subscribers.length
is different from parallelism()
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=FULL) public static <T> @NonNull ParallelFlowable<T> from(@NonNull Publisher<? extends T> source)
Publisher
and prepare to consume it on multiple 'rails' (number of CPUs)
in a round-robin fashion.
Flowable.bufferSize()
amount from the upstream, followed
by 75% of that amount requested after every 75% received.from
does not operate by default on a particular Scheduler
.T
- the value typesource
- the source Publisher
ParallelFlowable
instanceNullPointerException
- if source
is null
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=FULL) public static <T> @NonNull ParallelFlowable<T> from(@NonNull Publisher<? extends T> source, int parallelism)
Publisher
and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.
Flowable.bufferSize()
amount from the upstream, followed
by 75% of that amount requested after every 75% received.from
does not operate by default on a particular Scheduler
.T
- the value typesource
- the source Publisher
parallelism
- the number of parallel railsParallelFlowable
instanceNullPointerException
- if source
is null
IllegalArgumentException
- if parallelism
is non-positive@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=FULL) public static <T> @NonNull ParallelFlowable<T> from(@NonNull Publisher<? extends T> source, int parallelism, int prefetch)
Publisher
and prepare to consume it on parallelism number of 'rails' ,
possibly ordered and round-robin fashion and use custom prefetch amount and queue
for dealing with the source Publisher
's values.
prefetch
amount from the upstream, followed
by 75% of that amount requested after every 75% received.from
does not operate by default on a particular Scheduler
.T
- the value typesource
- the source Publisher
parallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the source
the source until there is a rail ready to process it.ParallelFlowable
instanceNullPointerException
- if source
is null
IllegalArgumentException
- if parallelism
or prefetch
is non-positive@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final <R> @NonNull ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper)
Note that the same mapper
function may be called from multiple threads concurrently.
map
does not operate by default on a particular Scheduler
.R
- the output value typemapper
- the mapper function turning Ts into Rs.ParallelFlowable
instanceNullPointerException
- if mapper
is null
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final <R> @NonNull ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
Note that the same mapper
function may be called from multiple threads concurrently.
map
does not operate by default on a particular Scheduler
.History: 2.0.8 - experimental
R
- the output value typemapper
- the mapper function turning Ts into Rs.errorHandler
- the enumeration that defines how to handle errors thrown
from the mapper
functionParallelFlowable
instanceNullPointerException
- if mapper
or errorHandler
is null
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final <R> @NonNull ParallelFlowable<R> map(@NonNull Function<? super T,? extends R> mapper, @NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Note that the same mapper
function may be called from multiple threads concurrently.
map
does not operate by default on a particular Scheduler
.History: 2.0.8 - experimental
R
- the output value typemapper
- the mapper function turning Ts into Rs.errorHandler
- the function called with the current repeat count and
failure Throwable
and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.ParallelFlowable
instanceNullPointerException
- if mapper
or errorHandler
is null
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final @NonNull ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate)
Note that the same predicate may be called from multiple threads concurrently.
filter
does not operate by default on a particular Scheduler
.predicate
- the function returning true
to keep a value or false
to drop a valueParallelFlowable
instanceNullPointerException
- if predicate
is null
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final @NonNull ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
Note that the same predicate may be called from multiple threads concurrently.
filter
does not operate by default on a particular Scheduler
.History: 2.0.8 - experimental
predicate
- the function returning true
to keep a value or false
to drop a valueerrorHandler
- the enumeration that defines how to handle errors thrown
from the predicate
ParallelFlowable
instanceNullPointerException
- if predicate
or errorHandler
is null
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final @NonNull ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Note that the same predicate may be called from multiple threads concurrently.
map
does not operate by default on a particular Scheduler
.History: 2.0.8 - experimental
predicate
- the function returning true
to keep a value or false
to drop a valueerrorHandler
- the function called with the current repeat count and
failure Throwable
and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.ParallelFlowable
instanceNullPointerException
- if predicate
or errorHandler
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="custom") public final @NonNull ParallelFlowable<T> runOn(@NonNull Scheduler scheduler)
Scheduler
, with
no work-stealing and default prefetch amount.
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
The operator will call Scheduler.createWorker()
as many
times as this ParallelFlowable
's parallelism level is.
No assumptions are made about the Scheduler
's parallelism level,
if the Scheduler
's parallelism level is lower than the ParallelFlowable
's,
some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler
to be trampolining as it
does its own built-in trampolining logic.
Flowable.bufferSize()
amount from the upstream, followed
by 75% of that amount requested after every 75% received.runOn
drains the upstream rails on the specified Scheduler
's
Worker
s.scheduler
- the scheduler to useParallelFlowable
instanceNullPointerException
- if scheduler
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="custom") public final @NonNull ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch)
Scheduler
, with
possibly work-stealing and a given prefetch amount.
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
The operator will call Scheduler.createWorker()
as many
times as this ParallelFlowable
's parallelism level is.
No assumptions are made about the Scheduler
's parallelism level,
if the Scheduler
's parallelism level is lower than the ParallelFlowable
's,
some rails may end up on the same thread/worker.
This operator doesn't require the Scheduler
to be trampolining as it
does its own built-in trampolining logic.
prefetch
amount from the upstream, followed
by 75% of that amount requested after every 75% received.runOn
drains the upstream rails on the specified Scheduler
's
Worker
s.scheduler
- the scheduler to use
that rail's worker has run out of work.prefetch
- the number of values to request on each 'rail' from the sourceParallelFlowable
instanceNullPointerException
- if scheduler
is null
IllegalArgumentException
- if prefetch
is non-positive@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public final @NonNull Flowable<T> reduce(@NonNull BiFunction<T,T,T> reducer)
Flowable
sequence.
Note that the same reducer function may be called from multiple threads concurrently.
Long.MAX_VALUE
).reduce
does not operate by default on a particular Scheduler
.reducer
- the function to reduce two values into one.Flowable
instance emitting the reduced value or empty if the current ParallelFlowable
is emptyNullPointerException
- if reducer
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> reduce(@NonNull Supplier<R> initialSupplier, @NonNull BiFunction<R,? super T,R> reducer)
initialSupplier
value.
Note that the same mapper function may be called from multiple threads concurrently.
Long.MAX_VALUE
).reduce
does not operate by default on a particular Scheduler
.R
- the reduced output typeinitialSupplier
- the supplier for the initial valuereducer
- the function to reduce a previous output of reduce (or the initial value supplied)
with a current source value.ParallelFlowable
instanceNullPointerException
- if initialSupplier
or reducer
is null
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue @NonNull public final @NonNull Flowable<T> sequential()
Flowable
sequence, running with a default prefetch value
for the rails.
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
Flowable.bufferSize()
amount from each rail, then
requests from each rail 75% of this amount after 75% received.sequential
does not operate by default on a particular Scheduler
.Flowable
instancesequential(int)
,
sequentialDelayError()
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue @NonNull public final @NonNull Flowable<T> sequential(int prefetch)
Flowable
sequence, running with a give prefetch value
for the rails.
prefetch
amount from each rail, then
requests from each rail 75% of this amount after 75% received.sequential
does not operate by default on a particular Scheduler
.prefetch
- the prefetch amount to use for each railFlowable
instanceIllegalArgumentException
- if prefetch
is non-positivesequential()
,
sequentialDelayError(int)
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue @NonNull public final @NonNull Flowable<T> sequentialDelayError()
Flowable
sequence, running with a default prefetch value
for the rails and delaying errors from all rails till all terminate.
This operator uses the default prefetch size returned by Flowable.bufferSize()
.
Flowable.bufferSize()
amount from each rail, then
requests from each rail 75% of this amount after 75% received.sequentialDelayError
does not operate by default on a particular Scheduler
.History: 2.0.7 - experimental
Flowable
instancesequentialDelayError(int)
,
sequential()
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @CheckReturnValue @NonNull public final @NonNull Flowable<T> sequentialDelayError(int prefetch)
Flowable
sequence, running with a give prefetch value
for the rails and delaying errors from all rails till all terminate.
prefetch
amount from each rail, then
requests from each rail 75% of this amount after 75% received.sequentialDelayError
does not operate by default on a particular Scheduler
.History: 2.0.7 - experimental
prefetch
- the prefetch amount to use for each railFlowable
instanceIllegalArgumentException
- if prefetch
is non-positivesequential()
,
sequentialDelayError()
@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public final @NonNull Flowable<T> sorted(@NonNull Comparator<? super T> comparator)
ParallelFlowable
and returns a Flowable
that sequentially
picks the smallest next value from the rails.
This operator requires a finite source ParallelFlowable
.
Long.MAX_VALUE
).sorted
does not operate by default on a particular Scheduler
.comparator
- the comparator to useFlowable
instanceNullPointerException
- if comparator
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public final @NonNull Flowable<T> sorted(@NonNull Comparator<? super T> comparator, int capacityHint)
ParallelFlowable
and returns a Flowable
that sequentially
picks the smallest next value from the rails.
This operator requires a finite source ParallelFlowable
.
Long.MAX_VALUE
).sorted
does not operate by default on a particular Scheduler
.comparator
- the comparator to usecapacityHint
- the expected number of total elementsFlowable
instanceNullPointerException
- if comparator
is null
IllegalArgumentException
- if capacityHint
is non-positive@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public final @NonNull Flowable<List<T>> toSortedList(@NonNull Comparator<? super T> comparator)
List
as a Flowable
.
This operator requires a finite source ParallelFlowable
.
Long.MAX_VALUE
).toSortedList
does not operate by default on a particular Scheduler
.comparator
- the comparator to compare elementsFlowable
instanceNullPointerException
- if comparator
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public final @NonNull Flowable<List<T>> toSortedList(@NonNull Comparator<? super T> comparator, int capacityHint)
List
as a Flowable
.
This operator requires a finite source ParallelFlowable
.
Long.MAX_VALUE
).toSortedList
does not operate by default on a particular Scheduler
.comparator
- the comparator to compare elementscapacityHint
- the expected number of total elementsFlowable
instanceNullPointerException
- if comparator
is null
IllegalArgumentException
- if capacityHint
is non-positive@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext)
map
does not operate by default on a particular Scheduler
.onNext
- the callbackParallelFlowable
instanceNullPointerException
- if onNext
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
map
does not operate by default on a particular Scheduler
.History: 2.0.8 - experimental
onNext
- the callbackerrorHandler
- the enumeration that defines how to handle errors thrown
from the onNext
consumerParallelFlowable
instanceNullPointerException
- if onNext
or errorHandler
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext, @NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
map
does not operate by default on a particular Scheduler
.History: 2.0.8 - experimental
onNext
- the callbackerrorHandler
- the function called with the current repeat count and
failure Throwable
and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.ParallelFlowable
instanceNullPointerException
- if onNext
or errorHandler
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doAfterNext(@NonNull Consumer<? super T> onAfterNext)
map
does not operate by default on a particular Scheduler
.onAfterNext
- the callbackParallelFlowable
instanceNullPointerException
- if onAfterNext
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doOnError(@NonNull Consumer<? super Throwable> onError)
map
does not operate by default on a particular Scheduler
.onError
- the callbackParallelFlowable
instanceNullPointerException
- if onError
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doOnComplete(@NonNull Action onComplete)
Action
when a 'rail' completes.
map
does not operate by default on a particular Scheduler
.onComplete
- the callbackParallelFlowable
instanceNullPointerException
- if onComplete
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doAfterTerminated(@NonNull Action onAfterTerminate)
Action
when a 'rail' completes or signals an error.
map
does not operate by default on a particular Scheduler
.onAfterTerminate
- the callbackParallelFlowable
instanceNullPointerException
- if onAfterTerminate
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doOnSubscribe(@NonNull Consumer<? super Subscription> onSubscribe)
Subscription
from its upstream.
map
does not operate by default on a particular Scheduler
.onSubscribe
- the callbackParallelFlowable
instanceNullPointerException
- if onSubscribe
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final @NonNull ParallelFlowable<T> doOnRequest(@NonNull LongConsumer onRequest)
map
does not operate by default on a particular Scheduler
.onRequest
- the callbackParallelFlowable
instanceNullPointerException
- if onRequest
is null
@BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") @CheckReturnValue @NonNull public final @NonNull ParallelFlowable<T> doOnCancel(@NonNull Action onCancel)
Action
when a 'rail' receives a cancellation.
map
does not operate by default on a particular Scheduler
.onCancel
- the callbackParallelFlowable
instanceNullPointerException
- if onCancel
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public final <C> @NonNull ParallelFlowable<C> collect(@NonNull Supplier<? extends C> collectionSupplier, @NonNull BiConsumer<? super C,? super T> collector)
collectionSupplier
and collected into with a collector action, emitting the collection at the end.
Long.MAX_VALUE
).map
does not operate by default on a particular Scheduler
.C
- the collection typecollectionSupplier
- the supplier of the collection in each railcollector
- the collector, taking the per-rail collection and the current itemParallelFlowable
instanceNullPointerException
- if collectionSupplier
or collector
is null
@CheckReturnValue @NonNull @SafeVarargs @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public static <T> @NonNull ParallelFlowable<T> fromArray(@NonNull Publisher<T>... publishers)
Publisher
s into a ParallelFlowable
which runs them
in parallel and unordered.
map
does not operate by default on a particular Scheduler
.T
- the value typepublishers
- the array of publishersParallelFlowable
instanceNullPointerException
- if publishers
is null
IllegalArgumentException
- if publishers
is an empty array@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final <R> R to(@NonNull ParallelFlowableConverter<T,R> converter)
This allows fluent conversion to any other type.
to
does not operate by default on a particular Scheduler
.History: 2.1.7 - experimental
R
- the resulting object typeconverter
- the function that receives the current ParallelFlowable
instance and returns a valueNullPointerException
- if converter
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=PASS_THROUGH) @SchedulerSupport(value="none") public final <U> @NonNull ParallelFlowable<U> compose(@NonNull ParallelTransformer<T,U> composer)
ParallelFlowable
and returns another ParallelFlowable
with composed features.
compose
does not operate by default on a particular Scheduler
.U
- the output value typecomposer
- the composer function from ParallelFlowable
(this) to another ParallelFlowable
ParallelFlowable
returned by the functionNullPointerException
- if composer
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper)
Publisher
s on each 'rail'.
The errors are not delayed and uses unbounded concurrency along with default inner prefetch.
Flowable.bufferSize()
amount from each rail upfront
and keeps requesting as many items per rail as many inner sources on
that rail completed. The inner sources are requested Flowable.bufferSize()
amount upfront, then 75% of this amount requested after 75% received.flatMap
does not operate by default on a particular Scheduler
.R
- the result typemapper
- the function to map each rail's value into a Publisher
ParallelFlowable
instanceNullPointerException
- if mapper
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError)
Publisher
s on each 'rail', optionally delaying errors.
It uses unbounded concurrency along with default inner prefetch.
Flowable.bufferSize()
amount from each rail upfront
and keeps requesting as many items per rail as many inner sources on
that rail completed. The inner sources are requested Flowable.bufferSize()
amount upfront, then 75% of this amount requested after 75% received.
flatMap
does not operate by default on a particular Scheduler
.R
- the result typemapper
- the function to map each rail's value into a Publisher
delayError
- should the errors from the main and the inner sources delayed till everybody terminates?ParallelFlowable
instanceNullPointerException
- if mapper
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency)
Publisher
s on each 'rail', optionally delaying errors
and having a total number of simultaneous subscriptions to the inner Publisher
s.
It uses a default inner prefetch.
maxConcurrency
amount from each rail upfront
and keeps requesting as many items per rail as many inner sources on
that rail completed. The inner sources are requested Flowable.bufferSize()
amount upfront, then 75% of this amount requested after 75% received.
flatMap
does not operate by default on a particular Scheduler
.R
- the result typemapper
- the function to map each rail's value into a Publisher
delayError
- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the generated inner Publisher
sParallelFlowable
instanceNullPointerException
- if mapper
is null
IllegalArgumentException
- if maxConcurrency
is non-positive@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> flatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
Publisher
s on each 'rail', optionally delaying errors,
having a total number of simultaneous subscriptions to the inner Publisher
s
and using the given prefetch amount for the inner Publisher
s.
maxConcurrency
amount from each rail upfront
and keeps requesting as many items per rail as many inner sources on
that rail completed. The inner sources are requested the prefetch
amount upfront, then 75% of this amount requested after 75% received.
flatMap
does not operate by default on a particular Scheduler
.R
- the result typemapper
- the function to map each rail's value into a Publisher
delayError
- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the generated inner Publisher
sprefetch
- the number of items to prefetch from each inner Publisher
ParallelFlowable
instanceNullPointerException
- if mapper
is null
IllegalArgumentException
- if maxConcurrency
or prefetch
is non-positive@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> concatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper)
Publisher
s on each 'rail', signalling errors immediately
and generating 2 publishers upfront.
concatMap
does not operate by default on a particular Scheduler
.R
- the result typemapper
- the function to map each rail's value into a Publisher
source and the inner Publisher
s (immediate, boundary, end)ParallelFlowable
instanceNullPointerException
- if mapper
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> concatMap(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, int prefetch)
Publisher
s on each 'rail', signalling errors immediately
and using the given prefetch amount for generating Publisher
s upfront.
prefetch
amount from each rail upfront and keeps
requesting 75% of this amount after 75% received and the inner sources completed.
Requests for the inner sources are determined by the downstream rails'
backpressure behavior.concatMap
does not operate by default on a particular Scheduler
.R
- the result typemapper
- the function to map each rail's value into a Publisher
prefetch
- the number of items to prefetch from each inner Publisher
source and the inner Publisher
s (immediate, boundary, end)ParallelFlowable
instanceNullPointerException
- if mapper
is null
IllegalArgumentException
- if prefetch
is non-positive@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> concatMapDelayError(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, boolean tillTheEnd)
Publisher
s on each 'rail', optionally delaying errors
and generating 2 publishers upfront.
concatMap
does not operate by default on a particular Scheduler
.R
- the result typemapper
- the function to map each rail's value into a Publisher
tillTheEnd
- if true
, all errors from the upstream and inner Publisher
s are delayed
till all of them terminate, if false
, the error is emitted when an inner Publisher
terminates.
source and the inner Publisher
s (immediate, boundary, end)ParallelFlowable
instanceNullPointerException
- if mapper
is null
@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <R> @NonNull ParallelFlowable<R> concatMapDelayError(@NonNull Function<? super T,? extends Publisher<? extends R>> mapper, int prefetch, boolean tillTheEnd)
Publisher
s on each 'rail', optionally delaying errors
and using the given prefetch amount for generating Publisher
s upfront.
prefetch
amount from each rail upfront and keeps
requesting 75% of this amount after 75% received and the inner sources completed.
Requests for the inner sources are determined by the downstream rails'
backpressure behavior.concatMap
does not operate by default on a particular Scheduler
.R
- the result typemapper
- the function to map each rail's value into a Publisher
prefetch
- the number of items to prefetch from each inner Publisher
tillTheEnd
- if true
, all errors from the upstream and inner Publisher
s are delayed
till all of them terminate, if false
, the error is emitted when an inner Publisher
terminates.ParallelFlowable
instanceNullPointerException
- if mapper
is null
IllegalArgumentException
- if prefetch
is non-positive@CheckReturnValue @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @NonNull public final <U> @NonNull ParallelFlowable<U> flatMapIterable(@NonNull Function<? super T,? extends Iterable<? extends U>> mapper)
ParallelFlowable
that merges each item emitted by the source on each rail with the values in an
Iterable
corresponding to that item that is generated by a selector.
ParallelFlowable
s is
expected to honor backpressure as well. If the source ParallelFlowable
violates the rule, the operator will
signal a MissingBackpressureException
.flatMapIterable
does not operate by default on a particular Scheduler
.U
- the type of item emitted by the resulting Iterable
mapper
- a function that returns an Iterable
sequence of values for when given an item emitted by the
source ParallelFlowable
ParallelFlowable
instanceNullPointerException
- if mapper
is null
flatMapStream(Function)
@CheckReturnValue @NonNull @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public final <U> @NonNull ParallelFlowable<U> flatMapIterable(@NonNull Function<? super T,? extends Iterable<? extends U>> mapper, int bufferSize)
ParallelFlowable
that merges each item emitted by the source ParallelFlowable
with the values in an
Iterable
corresponding to that item that is generated by a selector.
ParallelFlowable
s is
expected to honor backpressure as well. If the source ParallelFlowable
violates the rule, the operator will
signal a MissingBackpressureException
.flatMapIterable
does not operate by default on a particular Scheduler
.U
- the type of item emitted by the resulting Iterable
mapper
- a function that returns an Iterable
sequence of values for when given an item emitted by the
source ParallelFlowable
bufferSize
- the number of elements to prefetch from each upstream railParallelFlowable
instanceNullPointerException
- if mapper
is null
IllegalArgumentException
- if bufferSize
is non-positiveflatMapStream(Function, int)
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final <R> @NonNull ParallelFlowable<R> mapOptional(@NonNull Function<? super T,Optional<? extends R>> mapper)
Note that the same mapper function may be called from multiple threads concurrently.
map
does not operate by default on a particular Scheduler
.R
- the output value typemapper
- the mapper function turning Ts into optional of Rs.ParallelFlowable
instanceNullPointerException
- if mapper
is null
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final <R> @NonNull ParallelFlowable<R> mapOptional(@NonNull Function<? super T,Optional<? extends R>> mapper, @NonNull ParallelFailureHandling errorHandler)
ParallelFailureHandling
enumeration value.
Note that the same mapper function may be called from multiple threads concurrently.
map
does not operate by default on a particular Scheduler
.History: 2.0.8 - experimental
R
- the output value typemapper
- the mapper function turning Ts into optional of Rs.errorHandler
- the enumeration that defines how to handle errors thrown
from the mapper functionParallelFlowable
instanceNullPointerException
- if mapper
or errorHandler
is null
@CheckReturnValue @NonNull @SchedulerSupport(value="none") @BackpressureSupport(value=PASS_THROUGH) public final <R> @NonNull ParallelFlowable<R> mapOptional(@NonNull Function<? super T,Optional<? extends R>> mapper, @NonNull BiFunction<? super Long,? super Throwable,ParallelFailureHandling> errorHandler)
Note that the same mapper function may be called from multiple threads concurrently.
map
does not operate by default on a particular Scheduler
.History: 2.0.8 - experimental
R
- the output value typemapper
- the mapper function turning Ts into optional of Rs.errorHandler
- the function called with the current repeat count and
failure Throwable
and should return one of the ParallelFailureHandling
enumeration values to indicate how to proceed.ParallelFlowable
instanceNullPointerException
- if mapper
or errorHandler
is null
@CheckReturnValue @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @NonNull public final <R> @NonNull ParallelFlowable<R> flatMapStream(@NonNull Function<? super T,? extends Stream<? extends R>> mapper)
Stream
and emits the Stream
's items to the downstream in a sequential fashion.
Due to the blocking and sequential nature of Java Stream
s, the streams are mapped and consumed in a sequential fashion
without interleaving (unlike a more general flatMap(Function)
). Therefore, flatMapStream
and
concatMapStream
are identical operators and are provided as aliases.
The operator closes the Stream
upon cancellation and when it terminates. The exceptions raised when
closing a Stream
are routed to the global error handler (RxJavaPlugins.onError(Throwable)
.
If a Stream
should not be closed, turn it into an Iterable
and use flatMapIterable(Function)
:
source.flatMapIterable(v -> createStream(v)::iterator);
Note that Stream
s can be consumed only once; any subsequent attempt to consume a Stream
will result in an IllegalStateException
.
Primitive streams are not supported and items have to be boxed manually (e.g., via IntStream.boxed()
):
source.flatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed());
Stream
does not support concurrent usage so creating and/or consuming the same instance multiple times
from multiple threads can lead to undefined behavior.
Flowable.bufferSize()
items of the upstream (then 75% of it after the 75% received)
and caches them until they are ready to be mapped into Stream
s
after the current Stream
has been consumed.flatMapStream
does not operate by default on a particular Scheduler
.R
- the element type of the Stream
s and the resultmapper
- the function that receives an upstream item and should return a Stream
whose elements
will be emitted to the downstreamParallelFlowable
instanceNullPointerException
- if mapper
is null
flatMap(Function)
,
flatMapIterable(Function)
,
flatMapStream(Function, int)
@CheckReturnValue @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @NonNull public final <R> @NonNull ParallelFlowable<R> flatMapStream(@NonNull Function<? super T,? extends Stream<? extends R>> mapper, int prefetch)
Stream
and emits the Stream
's items to the downstream in a sequential fashion.
Due to the blocking and sequential nature of Java Stream
s, the streams are mapped and consumed in a sequential fashion
without interleaving (unlike a more general flatMap(Function)
). Therefore, flatMapStream
and
concatMapStream
are identical operators and are provided as aliases.
The operator closes the Stream
upon cancellation and when it terminates. The exceptions raised when
closing a Stream
are routed to the global error handler (RxJavaPlugins.onError(Throwable)
.
If a Stream
should not be closed, turn it into an Iterable
and use flatMapIterable(Function, int)
:
source.flatMapIterable(v -> createStream(v)::iterator, 32);
Note that Stream
s can be consumed only once; any subsequent attempt to consume a Stream
will result in an IllegalStateException
.
Primitive streams are not supported and items have to be boxed manually (e.g., via IntStream.boxed()
):
source.flatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed(), 32);
Stream
does not support concurrent usage so creating and/or consuming the same instance multiple times
from multiple threads can lead to undefined behavior.
Stream
s
after the current Stream
has been consumed.flatMapStream
does not operate by default on a particular Scheduler
.R
- the element type of the Stream
s and the resultmapper
- the function that receives an upstream item and should return a Stream
whose elements
will be emitted to the downstreamprefetch
- the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items receivedParallelFlowable
instanceNullPointerException
- if mapper
is null
IllegalArgumentException
- if prefetch
is non-positiveflatMap(Function, boolean, int)
,
flatMapIterable(Function, int)
@CheckReturnValue @NonNull @BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public final <A,R> @NonNull Flowable<R> collect(@NonNull Collector<T,A,R> collector)
Collector
into one Flowable
containing a single value.
Each parallel rail receives its own Collector.accumulator()
and
Collector.combiner()
.
Long.MAX_VALUE
).collect
does not operate by default on a particular Scheduler
.A
- the accumulator typeR
- the output value typecollector
- the Collector
instanceFlowable
instance emitting the collected value.NullPointerException
- if collector
is null