S
- the type of the user-define state used in generateState(S)
,
next(S, Subscriber)
, and
onUnsubscribe(S)
.T
- the type of Subscribers
that will be compatible with this
.public abstract class SyncOnSubscribe<S,T> extends java.lang.Object implements Observable.OnSubscribe<T>
OnSubscribe<T>
functions that responds correctly to back
pressure requests from subscribers. This is an improvement over
Observable.create(OnSubscribe)
which does not provide
any means of managing back pressure requests out-of-the-box.Constructor and Description |
---|
SyncOnSubscribe() |
Modifier and Type | Method and Description |
---|---|
void |
call(Subscriber<? super T> subscriber) |
static <S,T> SyncOnSubscribe<S,T> |
createSingleState(Func0<? extends S> generator,
Action2<? super S,? super Observer<? super T>> next)
Generates a synchronous
SyncOnSubscribe that calls the provided next function
to generate data to downstream subscribers. |
static <S,T> SyncOnSubscribe<S,T> |
createSingleState(Func0<? extends S> generator,
Action2<? super S,? super Observer<? super T>> next,
Action1<? super S> onUnsubscribe)
Generates a synchronous
SyncOnSubscribe that calls the provided next function
to generate data to downstream subscribers. |
static <S,T> SyncOnSubscribe<S,T> |
createStateful(Func0<? extends S> generator,
Func2<? super S,? super Observer<? super T>,? extends S> next)
Generates a synchronous
SyncOnSubscribe that calls the provided next function
to generate data to downstream subscribers. |
static <S,T> SyncOnSubscribe<S,T> |
createStateful(Func0<? extends S> generator,
Func2<? super S,? super Observer<? super T>,? extends S> next,
Action1<? super S> onUnsubscribe)
Generates a synchronous
SyncOnSubscribe that calls the provided next function
to generate data to downstream subscribers. |
static <T> SyncOnSubscribe<java.lang.Void,T> |
createStateless(Action1<? super Observer<? super T>> next)
Generates a synchronous
SyncOnSubscribe that calls the provided next function
to generate data to downstream subscribers. |
static <T> SyncOnSubscribe<java.lang.Void,T> |
createStateless(Action1<? super Observer<? super T>> next,
Action0 onUnsubscribe)
Generates a synchronous
SyncOnSubscribe that calls the provided next function
to generate data to downstream subscribers. |
protected abstract S |
generateState()
Executed once when subscribed to by a subscriber (via
call(Subscriber) )
to produce a state value. |
protected abstract S |
next(S state,
Observer<? super T> observer)
Called to produce data to the downstream subscribers.
|
protected void |
onUnsubscribe(S state)
Clean up behavior that is executed after the downstream subscriber's subscription is
unsubscribed.
|
public final void call(Subscriber<? super T> subscriber)
call
in interface Action1<Subscriber<? super T>>
protected abstract S generateState()
call(Subscriber)
)
to produce a state value. This value is passed into next(S
state, Observer observer)
on the first iteration. Subsequent iterations of next
will receive the state returned by the previous invocation of next
.protected abstract S next(S state, Observer<? super T> observer)
observer.onNext(t)
. To signal an error condition call
observer.onError(throwable)
or throw an Exception. To signal the end of a data stream
call observer.onCompleted()
. Implementations of this method must follow the following rules.
observer.onNext(t)
more than 1 time per invocation.observer.onNext(t)
concurrently.state
argument of the next invocation of this method.state
- the state value (from generateState()
on the first invocation or the
previous invocation of this method.observer
- the observer of data emitted byprotected void onUnsubscribe(S state)
state
- the last state value prior from generateState()
or
next(S, Observer<T>)
before unsubscribe.public static <S,T> SyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action2<? super S,? super Observer<? super T>> next)
SyncOnSubscribe
that calls the provided next
function
to generate data to downstream subscribers.T
- the type of the generated valuesS
- the type of the associated state with each Subscribergenerator
- generates the initial state value (see generateState()
)next
- produces data to the downstream subscriber (see next(S, Subscriber)
)public static <S,T> SyncOnSubscribe<S,T> createSingleState(Func0<? extends S> generator, Action2<? super S,? super Observer<? super T>> next, Action1<? super S> onUnsubscribe)
SyncOnSubscribe
that calls the provided next
function
to generate data to downstream subscribers.
This overload creates a SyncOnSubscribe without an explicit clean up step.T
- the type of the generated valuesS
- the type of the associated state with each Subscribergenerator
- generates the initial state value (see generateState()
)next
- produces data to the downstream subscriber (see next(S, Subscriber)
)onUnsubscribe
- clean up behavior (see onUnsubscribe(S)
)public static <S,T> SyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func2<? super S,? super Observer<? super T>,? extends S> next, Action1<? super S> onUnsubscribe)
SyncOnSubscribe
that calls the provided next
function
to generate data to downstream subscribers.T
- the type of the generated valuesS
- the type of the associated state with each Subscribergenerator
- generates the initial state value (see generateState()
)next
- produces data to the downstream subscriber (see next(S, Subscriber)
)onUnsubscribe
- clean up behavior (see onUnsubscribe(S)
)public static <S,T> SyncOnSubscribe<S,T> createStateful(Func0<? extends S> generator, Func2<? super S,? super Observer<? super T>,? extends S> next)
SyncOnSubscribe
that calls the provided next
function
to generate data to downstream subscribers.T
- the type of the generated valuesS
- the type of the associated state with each Subscribergenerator
- generates the initial state value (see generateState()
)next
- produces data to the downstream subscriber (see next(S, Subscriber)
)public static <T> SyncOnSubscribe<java.lang.Void,T> createStateless(Action1<? super Observer<? super T>> next)
SyncOnSubscribe
that calls the provided next
function
to generate data to downstream subscribers.
This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state
value. This should be used when the next
function closes over it's state.T
- the type of the generated valuesnext
- produces data to the downstream subscriber (see next(S, Subscriber)
)public static <T> SyncOnSubscribe<java.lang.Void,T> createStateless(Action1<? super Observer<? super T>> next, Action0 onUnsubscribe)
SyncOnSubscribe
that calls the provided next
function
to generate data to downstream subscribers.
This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state
value. This should be used when the next
function closes over it's state.T
- the type of the generated valuesnext
- produces data to the downstream subscriber (see next(S, Subscriber)
)onUnsubscribe
- clean up behavior (see onUnsubscribe(S)
)