S
- the type of the user-define state used in generateState(S)
,
next(S, Long, Observer)
, and
onUnsubscribe(S)
.T
- the type of Subscribers
that will be compatible with this
.@Beta public abstract class AsyncOnSubscribe<S,T> extends java.lang.Object implements Observable.OnSubscribe<T>
OnSubscribe<T>
functions that respond correctly to back
pressure requests from subscribers. This is an improvement over
Observable.create(OnSubscribe)
which does not provide
any means of managing back pressure requests out-of-the-box. This variant of an OnSubscribe
function allows for the asynchronous processing of requests.Constructor and Description |
---|
AsyncOnSubscribe() |
Modifier and Type | Method and Description |
---|---|
void |
call(Subscriber<? super T> actualSubscriber) |
static <S,T> AsyncOnSubscribe<S,T> |
createSingleState(Func0<? extends S> generator,
Action3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>> next)
Generates a synchronous
AsyncOnSubscribe that calls the provided next
function to generate data to downstream subscribers. |
static <S,T> AsyncOnSubscribe<S,T> |
createSingleState(Func0<? extends S> generator,
Action3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>> next,
Action1<? super S> onUnsubscribe)
Generates a synchronous
AsyncOnSubscribe that calls the provided next
function to generate data to downstream subscribers. |
static <S,T> AsyncOnSubscribe<S,T> |
createStateful(Func0<? extends S> generator,
Func3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>,? extends S> next)
Generates a synchronous
AsyncOnSubscribe that calls the provided next
function to generate data to downstream subscribers. |
static <S,T> AsyncOnSubscribe<S,T> |
createStateful(Func0<? extends S> generator,
Func3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>,? extends S> next,
Action1<? super S> onUnsubscribe)
Generates a synchronous
AsyncOnSubscribe that calls the provided next
function to generate data to downstream subscribers. |
static <T> AsyncOnSubscribe<java.lang.Void,T> |
createStateless(Action2<java.lang.Long,? super Observer<Observable<? extends T>>> next)
Generates a synchronous
AsyncOnSubscribe that calls the provided next
function to generate data to downstream subscribers. |
static <T> AsyncOnSubscribe<java.lang.Void,T> |
createStateless(Action2<java.lang.Long,? super Observer<Observable<? extends T>>> next,
Action0 onUnsubscribe)
Generates a synchronous
AsyncOnSubscribe that calls the provided next
function to generate data to downstream subscribers. |
protected abstract S |
generateState()
Executed once when subscribed to by a subscriber (via
call(Subscriber) )
to produce a state value. |
protected abstract S |
next(S state,
long requested,
Observer<Observable<? extends T>> observer)
Called to produce data to the downstream subscribers.
|
protected void |
onUnsubscribe(S state)
Clean up behavior that is executed after the downstream subscriber's subscription is
unsubscribed.
|
protected abstract S generateState()
call(Subscriber)
)
to produce a state value. This value is passed into next(S state, Observer observer)
on the first iteration. Subsequent iterations of
next
will receive the state returned by the previous invocation of next
.protected abstract S next(S state, long requested, Observer<Observable<? extends T>> observer)
observer.onNext(t)
. To signal an error condition call
observer.onError(throwable)
or throw an Exception. To signal the end of a data stream
call observer.onCompleted()
. Implementations of this method must follow the following
rules.
observer.onNext(t)
more than 1 time per invocation.observer.onNext(t)
concurrently.state
argument of the next invocation of this method.state
- the state value (from generateState()
on the first invocation or the
previous invocation of this method.requested
- the amount of data requested. An observable emitted to the observer should not
exceed this amount.observer
- the observer of data emitted byprotected void onUnsubscribe(S state)
state
- the last state value returned from next(S, Long, Observer)
or
generateState()
at the time when a terminal event is emitted from
next(Object, long, Observer)
or unsubscribing.public static <S,T> AsyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>> next)
AsyncOnSubscribe
that calls the provided next
function to generate data to downstream subscribers.T
- the type of the generated valuesS
- the type of the associated state with each Subscribergenerator
- generates the initial state value (see generateState()
)next
- produces data to the downstream subscriber (see
next(S, long, Observer)
)public static <S,T> AsyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>> next, Action1<? super S> onUnsubscribe)
AsyncOnSubscribe
that calls the provided next
function to generate data to downstream subscribers.
This overload creates a AsyncOnSubscribe without an explicit clean up step.T
- the type of the generated valuesS
- the type of the associated state with each Subscribergenerator
- generates the initial state value (see generateState()
)next
- produces data to the downstream subscriber (see
next(S, long, Observer)
)onUnsubscribe
- clean up behavior (see onUnsubscribe(S)
)public static <S,T> AsyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>,? extends S> next, Action1<? super S> onUnsubscribe)
AsyncOnSubscribe
that calls the provided next
function to generate data to downstream subscribers.T
- the type of the generated valuesS
- the type of the associated state with each Subscribergenerator
- generates the initial state value (see generateState()
)next
- produces data to the downstream subscriber (see
next(S, long, Observer)
)onUnsubscribe
- clean up behavior (see onUnsubscribe(S)
)public static <S,T> AsyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func3<? super S,java.lang.Long,? super Observer<Observable<? extends T>>,? extends S> next)
AsyncOnSubscribe
that calls the provided next
function to generate data to downstream subscribers.T
- the type of the generated valuesS
- the type of the associated state with each Subscribergenerator
- generates the initial state value (see generateState()
)next
- produces data to the downstream subscriber (see
next(S, long, Observer)
)public static <T> AsyncOnSubscribe<java.lang.Void,T> createStateless(Action2<java.lang.Long,? super Observer<Observable<? extends T>>> next)
AsyncOnSubscribe
that calls the provided next
function to generate data to downstream subscribers.
This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state
value. This should be used when the next
function closes over it's state.T
- the type of the generated valuesnext
- produces data to the downstream subscriber (see
next(S, long, Observer)
)public static <T> AsyncOnSubscribe<java.lang.Void,T> createStateless(Action2<java.lang.Long,? super Observer<Observable<? extends T>>> next, Action0 onUnsubscribe)
AsyncOnSubscribe
that calls the provided next
function to generate data to downstream subscribers.
This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state
value. This should be used when the next
function closes over it's state.T
- the type of the generated valuesnext
- produces data to the downstream subscriber (see
next(S, long, Observer)
)onUnsubscribe
- clean up behavior (see onUnsubscribe(S)
)public final void call(Subscriber<? super T> actualSubscriber)
call
in interface Action1<Subscriber<? super T>>