Class ConnectableObservable<T>
- Type Parameters:
T- the type of items emitted by theConnectableObservable
- All Implemented Interfaces:
ObservableSource<T>
ConnectableObservable resembles an ordinary Observable, except that it does not begin
emitting items when it is subscribed to, but only when its connect(Consumer) method is called. In this way you
can wait for all intended Observers to Observable.subscribe() to the Observable
before the Observable begins emitting items.
When the upstream terminates, the ConnectableObservable remains in this terminated state and,
depending on the actual underlying implementation, relays cached events to late Observers.
In order to reuse and restart this ConnectableObservable, the reset() method has to be called.
When called, this ConnectableObservable will appear as fresh, unconnected source to new Observers.
Disposing the connection will reset the ConnectableObservable to its fresh state and there is no need to call
reset() in this case.
Note that although connect() and reset() are safe to call from multiple threads, it is recommended
a dedicated thread or business logic manages the connection or resetting of a ConnectableObservable so that
there is no unwanted signal loss due to early connect() or reset() calls while Observers are
still being subscribed to to this ConnectableObservable to receive signals from the get go.
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionReturns anObservablethat automatically connects (at most once) to thisConnectableObservablewhen the firstObserversubscribes.autoConnect(int numberOfObservers) Returns anObservablethat automatically connects (at most once) to thisConnectableObservablewhen the specified number ofObservers subscribe to it.autoConnect(int numberOfObservers, @NonNull Consumer<? super Disposable> connection) Returns anObservablethat automatically connects (at most once) to thisConnectableObservablewhen the specified number ofObservers subscribe to it and calls the specified callback with theDisposableassociated with the established connection.final @NonNull Disposableconnect()Instructs theConnectableObservableto begin emitting the items from its underlyingObservableto itsObservers.abstract voidconnect(@NonNull Consumer<? super Disposable> connection) Instructs theConnectableObservableto begin emitting the items from its underlyingObservableto itsObservers.refCount()Returns anObservablethat stays connected to thisConnectableObservableas long as there is at least one subscription to thisConnectableObservable.final @NonNull Observable<T> refCount(int observerCount) Connects to the upstreamConnectableObservableif the number of subscribed observers reaches the specified count and disconnect if allObservers have unsubscribed.final @NonNull Observable<T> Connects to the upstreamConnectableObservableif the number of subscribed observers reaches the specified count and disconnect after the specified timeout if allObservers have unsubscribed.final @NonNull Observable<T> Connects to the upstreamConnectableObservableif the number of subscribed observers reaches the specified count and disconnect after the specified timeout if allObservers have unsubscribed.final @NonNull Observable<T> Connects to the upstreamConnectableObservableif the number of subscribed observers reaches 1 and disconnect after the specified timeout if allObservers have unsubscribed.final @NonNull Observable<T> Connects to the upstreamConnectableObservableif the number of subscribed observers reaches 1 and disconnect after the specified timeout if allObservers have unsubscribed.abstract voidreset()Resets thisConnectableObservableinto its fresh state if it has terminated or has been disposed.Methods inherited from class Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeActual, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
Constructor Details
-
ConnectableObservable
public ConnectableObservable()
-
-
Method Details
-
connect
@SchedulerSupport("none") public abstract void connect(@NonNull @NonNull Consumer<? super Disposable> connection) Instructs theConnectableObservableto begin emitting the items from its underlyingObservableto itsObservers.- Scheduler:
- The behavior is determined by the implementor of this abstract class.
- Parameters:
connection- the action that receives the connection subscription before the subscription to source happens allowing the caller to synchronously disconnect a synchronous source- Throws:
NullPointerException- ifconnectionisnull- See Also:
-
reset
Resets thisConnectableObservableinto its fresh state if it has terminated or has been disposed.Calling this method on a fresh or active
ConnectableObservablehas no effect.- Scheduler:
- The behavior is determined by the implementor of this abstract class.
- Since:
- 3.0.0
-
connect
Instructs theConnectableObservableto begin emitting the items from its underlyingObservableto itsObservers.To disconnect from a synchronous source, use the
connect(Consumer)method.- Scheduler:
- The behavior is determined by the implementor of this abstract class.
- Returns:
- the
Disposablerepresenting the connection - See Also:
-
refCount
Returns anObservablethat stays connected to thisConnectableObservableas long as there is at least one subscription to thisConnectableObservable.- Scheduler:
- This
refCountoverload does not operate on any particularScheduler.
- Returns:
- a new
Observableinstance - See Also:
-
refCount
@CheckReturnValue @SchedulerSupport("none") @NonNull public final @NonNull Observable<T> refCount(int observerCount) Connects to the upstreamConnectableObservableif the number of subscribed observers reaches the specified count and disconnect if allObservers have unsubscribed.- Scheduler:
- This
refCountoverload does not operate on any particularScheduler.
History: 2.1.14 - experimental
- Parameters:
observerCount- the number ofObservers required to connect to the upstream- Returns:
- the new
Observableinstance - Throws:
IllegalArgumentException- ifobserverCountis non-positive- Since:
- 2.2
-
refCount
@CheckReturnValue @SchedulerSupport("io.reactivex:computation") @NonNull public final @NonNull Observable<T> refCount(long timeout, @NonNull @NonNull TimeUnit unit) Connects to the upstreamConnectableObservableif the number of subscribed observers reaches 1 and disconnect after the specified timeout if allObservers have unsubscribed.- Scheduler:
- This
refCountoverload operates on thecomputationScheduler.
History: 2.1.14 - experimental
- Parameters:
timeout- the time to wait before disconnecting after allObservers unsubscribedunit- the time unit of the timeout- Returns:
- the new
Observableinstance - Throws:
NullPointerException- ifunitisnull- Since:
- 2.2
- See Also:
-
refCount
@CheckReturnValue @SchedulerSupport("custom") @NonNull public final @NonNull Observable<T> refCount(long timeout, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler) Connects to the upstreamConnectableObservableif the number of subscribed observers reaches 1 and disconnect after the specified timeout if allObservers have unsubscribed.- Scheduler:
- This
refCountoverload operates on the specifiedScheduler.
History: 2.1.14 - experimental
- Parameters:
timeout- the time to wait before disconnecting after allObservers unsubscribedunit- the time unit of the timeoutscheduler- the target scheduler to wait on before disconnecting- Returns:
- the new
Observableinstance - Throws:
NullPointerException- ifunitorschedulerisnull- Since:
- 2.2
-
refCount
@CheckReturnValue @SchedulerSupport("io.reactivex:computation") @NonNull public final @NonNull Observable<T> refCount(int observerCount, long timeout, @NonNull @NonNull TimeUnit unit) Connects to the upstreamConnectableObservableif the number of subscribed observers reaches the specified count and disconnect after the specified timeout if allObservers have unsubscribed.- Scheduler:
- This
refCountoverload operates on thecomputationScheduler.
History: 2.1.14 - experimental
- Parameters:
observerCount- the number ofObservers required to connect to the upstreamtimeout- the time to wait before disconnecting after allObservers unsubscribedunit- the time unit of the timeout- Returns:
- the new
Observableinstance - Throws:
NullPointerException- ifunitorschedulerisnullIllegalArgumentException- ifobserverCountis non-positive- Since:
- 2.2
- See Also:
-
refCount
@CheckReturnValue @SchedulerSupport("custom") @NonNull public final @NonNull Observable<T> refCount(int observerCount, long timeout, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler) Connects to the upstreamConnectableObservableif the number of subscribed observers reaches the specified count and disconnect after the specified timeout if allObservers have unsubscribed.- Scheduler:
- This
refCountoverload operates on the specifiedScheduler.
History: 2.1.14 - experimental
- Parameters:
observerCount- the number ofObservers required to connect to the upstreamtimeout- the time to wait before disconnecting after allObservers unsubscribedunit- the time unit of the timeoutscheduler- the target scheduler to wait on before disconnecting- Returns:
- the new
Observableinstance - Throws:
NullPointerException- ifunitorschedulerisnullIllegalArgumentException- ifobserverCountis non-positive- Since:
- 2.2
-
autoConnect
Returns anObservablethat automatically connects (at most once) to thisConnectableObservablewhen the firstObserversubscribes.
The connection happens after the first subscription and happens at most once during the lifetime of the returned
Observable. If thisConnectableObservableterminates, the connection is never renewed, no matter howObservers come and go. UserefCount()to renew a connection or dispose an active connection when allObservers have disposed theirDisposables.This overload does not allow disconnecting the connection established via
connect(Consumer). Use theautoConnect(int, Consumer)overload to gain access to theDisposablerepresenting the only connection.- Scheduler:
autoConnectoverload does not operate on any particularScheduler.
- Returns:
- a new
Observableinstance that automatically connects to thisConnectableObservablewhen the firstObserversubscribes
-
autoConnect
@NonNull @CheckReturnValue @SchedulerSupport("none") public @NonNull Observable<T> autoConnect(int numberOfObservers) Returns anObservablethat automatically connects (at most once) to thisConnectableObservablewhen the specified number ofObservers subscribe to it.
The connection happens after the given number of subscriptions and happens at most once during the lifetime of the returned
Observable. If thisConnectableObservableterminates, the connection is never renewed, no matter howObservers come and go. UserefCount()to renew a connection or dispose an active connection when allObservers have disposed theirDisposables.This overload does not allow disconnecting the connection established via
connect(Consumer). Use theautoConnect(int, Consumer)overload to gain access to theDisposablerepresenting the only connection.- Scheduler:
autoConnectoverload does not operate on any particularScheduler.
- Parameters:
numberOfObservers- the number of subscribers to await before calling connect on theConnectableObservable. A non-positive value indicates an immediate connection.- Returns:
- a new
Observableinstance that automatically connects to thisConnectableObservablewhen the specified number ofObservers subscribe to it
-
autoConnect
@NonNull @CheckReturnValue @SchedulerSupport("none") public @NonNull Observable<T> autoConnect(int numberOfObservers, @NonNull @NonNull Consumer<? super Disposable> connection) Returns anObservablethat automatically connects (at most once) to thisConnectableObservablewhen the specified number ofObservers subscribe to it and calls the specified callback with theDisposableassociated with the established connection.
The connection happens after the given number of subscriptions and happens at most once during the lifetime of the returned
Observable. If thisConnectableObservableterminates, the connection is never renewed, no matter howObservers come and go. UserefCount()to renew a connection or dispose an active connection when allObservers have disposed theirDisposables.- Scheduler:
autoConnectoverload does not operate on any particularScheduler.
- Parameters:
numberOfObservers- the number of subscribers to await before calling connect on theConnectableObservable. A non-positive value indicates an immediate connection.connection- the callbackConsumerthat will receive theDisposablerepresenting the established connection- Returns:
- a new
Observableinstance that automatically connects to thisConnectableObservablewhen the specified number ofObservers subscribe to it and calls the specified callback with theDisposableassociated with the established connection - Throws:
NullPointerException- ifconnectionisnull
-