Subject

A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

Because a Subject subscribes to an Observable, it will trigger that Observable to begin emitting items (if that Observable is “cold” — that is, if it waits for a subscription before it begins to emit items). This can have the effect of making the resulting Subject a “hot” Observable variant of the original “cold” Observable.

See Also

Concurrent usage

In most ReactiveX implementations, especially those that can run in a multi-threaded environment, subjects on their observer side are not considered thread safe. However, the observable side, i.e., Subscribe() is always thread safe.

This means that calling OnNext, OnError or OnCompleted from multiple threads can result in an undefined state.

Most ReactiveX implementations therefore offer a special operator that makes the observer side thread safe as well. Look for the ToSerialized operator.

Varieties of Subject

There are four varieties of Subject that are designed for particular use cases. Not all of these are available in all implementations, and some implementations use other naming conventions (for example, in RxScala, what is called a “PublishSubject” here is known simply as a “Subject”):

AsyncSubject

An AsyncSubject emits the last value (and only the last value) emitted by the source Observable, and only after that source Observable completes. (If the source Observable does not emit any values, the AsyncSubject also completes without emitting any values.)

It will also emit this same final value to any subsequent observers. However, if the source Observable terminates with an error, the AsyncSubject will not emit any items, but will simply pass along the error notification from the source Observable.

See Also

BehaviorSubject

When an observer subscribes to a BehaviorSubject, it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).

However, if the source Observable terminates with an error, the BehaviorSubject will not emit any items to subsequent observers, but will simply pass along the error notification from the source Observable.

See Also

PublishSubject

PublishSubject emits to an observer only those items that are emitted by the source Observable(s) subsequent to the time of the subscription.

Note that a PublishSubject may begin emitting items immediately upon creation (unless you have taken steps to prevent this), and so there is a risk that one or more items may be lost between the time the Subject is created and the observer subscribes to it. If you need to guarantee delivery of all items from the source Observable, you’ll need either to form that Observable with Create so that you can manually reintroduce “cold” Observable behavior (checking to see that all observers have subscribed before beginning to emit items), or switch to using a ReplaySubject instead.

If the source Observable terminates with an error, the PublishSubject will not emit any items to subsequent observers, but will simply pass along the error notification from the source Observable.

ReplaySubject

ReplaySubject emits to any observer all of the items that were emitted by the source Observable(s), regardless of when the observer subscribes.

There are also versions of ReplaySubject that will throw away old items once the replay buffer threatens to grow beyond a certain size, or when a specified timespan has passed since the items were originally emitted.

If you use a ReplaySubject as an observer, take care not to call its onNext method (or its other on methods) from multiple threads, as this could lead to coincident (non-sequential) calls, which violates the Observable contract and creates an ambiguity in the resulting Subject as to which item or notification should be replayed first.

See Also

Other subject types

In certain ReactiveX flavors and versions, such as RxJava 3.x, there are a couple of more subject types available, fulfilling some extra common roles.

UnicastSubject

A Subject that queues up events until a single Observer subscribes to it, replays those events to it until the Observer catches up and then switches to relaying events live to this single Observer until this UnicastSubject terminates or the Observer disposes.

See Also

SingleSubject

Represents a hot Single-like source and consumer of events similar to Subjects. Since a Single can only ever emit an item or error, a SingleSubject is implicitly a replay-like subject.

See Also

MaybeSubject

Represents a hot Maybe-like source and consumer of events similar to Subjects. Since a Maybe can only ever emit an item, an error or become completed, a MaybeSubject is implicitly a replay-like subject.

See Also

CompletableSubject

Represents a hot Completable-like source and consumer of events similar to Subjects. Since a Completable can only ever complete or hold an error, a CompletableSubject is implicitly a replay-like subject.

See Also

Processors

RxJava 2.x and RxJava 3.x defines backpressure-aware subjects as Processors with very similar naming to the other Subjects above. These behave pretty much the same with the exception that they will not overflow their subscribers if they don't request more items. In general, these subjects don't coordinate between their subscribers and may fail them individually if they can not keep up.

A special processor, MulticastProcessor pictured above, does coordinate between its subscribers with respect to backpressure.

The SingleSubject, MaybeSubject and CompletableSubject subject types do not have processor variants because these do not need to support backpressure and can always hold at most one element.

Processors also implement the Reactive Streams Processor interface and thus they are compatible across the Reactive Streams ecosystem in Java.

See Also

Language-Specific Information:

If you have a Subject and you want to pass it along to some other agent without exposing its Subscriber interface, you can mask it by calling its asObservable method, which will return the Subject as a pure Observable.

See Also

If you have a Subject and you want to pass it along to some other agent without exposing its Subscriber interface, you can mask it by calling its asObservable method, which will return the Subject as a pure Observable.

See Also

If you have a Subject and you want to pass it along to some other agent without exposing its Observer interface, you can mask it by calling its hide method, which will return the Subject as a pure Observable.

See Also

TBD

See Also

TBD

See Also

TBD

TBD