Home Manual Reference Source Test Repository
Manual » Overview

Introduction

RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array#extras (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.

Think of RxJS as Lodash for events.

ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.

The essential concepts in RxJS which solve async event management are:

  • Observable: represents the idea of an invokable collection of future values or events.
  • Observer: is a collection of callbacks that knows how to listen to values delivered by the Observable.
  • Subscription: represents the execution of an Observable, is primarily useful for cancelling the execution.
  • Operators: are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, flatMap, etc.
  • Subject: is the equivalent to an EventEmitter, and the only way of multicasting a value or event to multiple Observers.
  • Schedulers: are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.

First examples

Normally you register event listeners.

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

Using RxJS you create an observable instead.

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscribe(() => console.log('Clicked!'));

Purity

What makes RxJS powerful is its ability to produce values using pure functions. That means your code is less prone for errors.

Normally you would create an impure function, where other pieces of your code can mess up your state.

var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));

Using RxJS you isolate the state.

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .scan(count => count + 1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

The scan operator works just like reduce for arrays. It takes a value which is exposed to a callback. The returned value of the callback will then become the next value exposed the next time the callback runs.

Flow

RxJS has a whole range of operators that helps you control how the events flow through your observables.

This is how you would allow at most one click per second, with plain JavaScript:

var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate) {
    console.log(`Clicked ${++count} times`);
    lastClick = Date.now();
  }
});

With RxJS:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .scan(count => count + 1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

Other flow control operators are filter, delay, debounceTime, take, takeUntil, distinct, distinctUntilChanged etc.

Values

You can transform the values passed through your observables.

Here's how you can add the current mouse x position for every click, in plain JavaScript:

var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate) {
    count += event.clientX;
    console.log(count)
    lastClick = Date.now();
  }
});

With RxJS:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .map(event => event.clientX)
  .scan((count, clientX) => count + clientX, 0)
  .subscribe(count => console.log(count));

Other value producing operators are pluck, pairwise, sample etc.

Observable

Observables are lazy Push collections of multiple values. They fill the missing spot in the following table:

Single Multiple
Pull Function Iterator
Push Promise Observable

Example. The following is an Observable that pushes the values 1, 2, 3 immediately (synchronously) when subscribed, and the value 4 after one second has passed since the subscribe call, then completes:

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

To invoke the Observable and see these values, we need to subscribe to it:

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

Which executes as such on the console:

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

Pull versus Push

Pull and Push are two different protocols how a data Producer can communicate with a data Consumer.

What is Pull? In Pull systems, the Consumer determines when it receives data from the data Producer. The Producer itself is unaware of when the data will be delivered to the Consumer.

Every JavaScript Function is a Pull system. The function is a Producer of data, and the code that calls the function is consuming it by "pulling" out a single return value from its call.

ES2015 introduced generator functions and iterators (function*), another type of Pull system. Code that calls iterator.next() is the Consumer, "pulling" out multiple values from the iterator (the Producer).

Producer Consumer
Pull Passive: produces data when requested. Active: decides when data is requested.
Push Active: produces data at its own pace. Passive: reacts to received data.

What is Push? In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.

Promises are the most common type of Push system in JavaScript today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers), but unlike functions, it is the Promise which is in charge of determining precisely when that value is "pushed" to the callbacks.

RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers).

  • A Function is a lazily evaluated computation that synchronously returns a single value on invocation.
  • A generator is a lazily evaluated computation that synchronously returns zero to (potentially) infinite values on iteration.
  • A Promise is a computation that may (or may not) eventually return a single value.
  • An Observable is a lazily evaluated computation that can synchronously or asynchronously return zero to (potentially) infinite values from the time it's invoked onwards.

Observables as generalizations of functions

Contrary to popular claims, Observables are not like EventEmitters nor are they like Promises for multiple values. Observables may act like EventEmitters in some cases, namely when they are multicasted using RxJS Subjects, but usually they don't act like EventEmitters.

Observables are like functions with zero arguments, but generalize those to allow multiple values.

Consider the following:

function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

We expect to see as output:

"Hello"
42
"Hello"
42

You can write the same behavior above, but with Observables:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

And the output is the same:

"Hello"
42
"Hello"
42

This happens because both functions and Observables are lazy computations. If you don't call the function, the console.log('Hello') won't happen. Also with Observables, if you don't "call" it (with subscribe), the console.log('Hello') won't happen. Plus, "calling" or "subscribing" is an isolated operation: two function calls trigger two separate side effects, and two Observable subscribes trigger two separate side effects. As opposed to EventEmitters which share the side effects and have eager execution regardless of the existence of subscribers, Observables have no shared execution and are lazy.

Subscribing to an Observable is analogous to calling a Function.

Some people claim that Observables are asynchronous. That is not true. If you surround a function call with logs, like this:

console.log('before');
console.log(foo.call());
console.log('after');

You will see the output:

"before"
"Hello"
42
"after"

And this is the same behavior with Observables:

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

And the output is:

"before"
"Hello"
42
"after"

Which proves the subscription of foo was entirely synchronous, just like a function.

Observables are able to deliver values either synchronously or asynchronously.

What is the difference between an Observable and a function? Observables can "return" multiple values over time, something which functions cannot. You can't do this:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}

Functions can only return one value. Observables, however, can do this:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200); // "return" yet another
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

With synchronous output:

"before"
"Hello"
42
100
200
"after"

But you can also "return" values asynchronously:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300); // happens asynchronously
  }, 1000);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

With output:

"before"
"Hello"
42
100
200
"after"
300

Conclusion:

  • func.call() means "give me one value synchronously"
  • observable.subscribe() means "give me any amount of values, either synchronously or asynchronously"

Anatomy of an Observable

Observables are created using Rx.Observable.create or a creation operator, are subscribed to with an Observer, execute to deliver next / error / complete notifications to the Observer, and their execution may be disposed. These four aspects are all encoded in an Observable instance, but some of these aspects are related to other types, like Observer and Subscription.

Core Observable concerns:

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables

Creating Observables

Rx.Observable.create is an alias for the Observable constructor, and it takes one argument: the subscribe function.

The following example creates an Observable to emit the string 'hi' every second to an Observer.

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});

Observables can be created with create, but usually we use the so-called creation operators, like of, from, interval, etc.

In the example above, the subscribe function is the most important piece to describe the Observable. Let's look at what subscribing means.

Subscribing to Observables

The Observable observable in the example can be subscribed to, like this:

observable.subscribe(x => console.log(x));

It is not a coincidence that observable.subscribe and subscribe in Observable.create(function subscribe(observer) {...}) have the same name. In the library, they are different, but for practical purposes you can consider them conceptually equal.

This shows how subscribe calls are not shared among multiple Observers of the same Observable. When calling observable.subscribe with an Observer, the function subscribe in Observable.create(function subscribe(observer) {...}) is run for that given Observer. Each call to observable.subscribe triggers its own independent setup for that given Observer.

Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.

This is drastically different to event handler APIs like addEventListener / removeEventListener. With observable.subscribe, the given Observer is not registered as a listener in the Observable. The Observable does not even maintain a list of attached Observers.

A subscribe call is simply a way to start an "Observable execution" and deliver values or events to an Observer of that execution.

Executing Observables

The code inside Observable.create(function subscribe(observer) {...}) represents an "Observable execution", a lazy computation that only happens for each Observer that subscribes. The execution produces multiple values over time, either synchronously or asynchronously.

There are three types of values an Observable Execution can deliver:

  • "Next" notification: sends a value such as a Number, a String, an Object, etc.
  • "Error" notification: sends a JavaScript Error or exception.
  • "Complete" notification: does not send a value.

Next notifications are the most important and most common type: they represent actual data being delivered to an Observer. Error and Complete notifications may happen only once during the Observable Execution, and there can only be either one of them.

These constraints are expressed best in the so-called Observable Grammar or Contract, written as a regular expression:

next*(error|complete)?

In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.

The following is an example of an Observable execution that delivers three Next notifications, then completes:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

Observables strictly adhere to the Observable Contract, so the following code would not deliver the Next notification 4:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // Is not delivered because it would violate the contract
});

It is a good idea to wrap any code in subscribe with try/catch block that will deliver an Error notification if it catches an exception:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // delivers an error if it caught one
  }
});

Disposing Observable Executions

Because Observable Executions may be infinite, and it's common for an Observer to want abort execution in finite time, we need an API for canceling an execution. Since each execution is exclusive to one Observer only, once the Observer is done receiving values, it has to have a way to stop the execution, in order to avoid wasting computation power or memory resources.

When observable.subscribe is called, the Observer gets attached to the newly created Observable execution, but also this call returns an object, the Subscription:

var subscription = observable.subscribe(x => console.log(x));

The Subscription represents the ongoing execution, and has a minimal API which allows you to cancel that execution. Read more about the Subscription type here. With subscription.unsubscribe() you can cancel the ongoing execution:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.

Each Observable must define how to dispose resources of that execution when we create the Observable using create(). You can do that by returning a custom unsubscribe function from within function subscribe().

For instance, this is how we clear an interval execution set with setInterval:

var observable = Rx.Observable.create(function subscribe(observer) {
  // Keep track of the interval resource
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  // Provide a way of canceling and disposing the interval resource
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});

Just like observable.subscribe resembles Observable.create(function subscribe() {...}), the unsubscribe we return from subscribe is conceptually equal to subscription.unsubscribe. In fact, if we remove the ReactiveX types surrounding these concepts, we're left with rather straightforward JavaScript.

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // dispose the resources

The reason why we use Rx types like Observable, Observer, and Subscription is to get safety (such as the Observable Contract) and composability with Operators.

Observer

What is an Observer? An Observer is a consumer of values delivered by an Observable. Observers are simply a set of callbacks, one for each type of notification delivered by the Observable: next, error, and complete. The following is an example of a typical Observer object:

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

To use the Observer, provide it to the subscribe of an Observable:

observable.subscribe(observer);

Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver.

Observers in RxJS may also be partial. If you don't provide one of the callbacks, the execution of the Observable will still happen normally, except some types of notifications will be ignored, because they don't have a corresponding callback in the Observer.

The example below is an Observer without the complete callback:

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
};

When subscribing to an Observable, you may also just provide the callbacks as arguments, without being attached to an Observer object, for instance like this:

observable.subscribe(x => console.log('Observer got a next value: ' + x));

Internally in observable.subscribe, it will create an Observer object using the first callback argument as the next handler. All three types of callbacks may be provided as arguments:

observable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);

Subscription

What is a Subscription? A Subscription is an object that represents a disposable resource, usually the execution of an Observable. A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription. In previous versions of RxJS, Subscription was called "Disposable".

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();

A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions.

Subscriptions can also be put together, so that a call to an unsubscribe() of one Subscription may unsubscribe multiple Subscriptions. You can do this by "adding" one subscription into another:

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

When executed, we see in the console:

second: 0
first: 0
second: 1
first: 1
second: 2

Subscriptions also have a remove(otherSubscription) method, in order to undo the addition of a child Subscription.

Subject

What is a Subject? An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.

Every Subject is an Observable. Given a Subject, you can subscribe to it, providing an Observer, which will start receiving values normally. From the perspective of the Observer, it cannot tell whether the Observable execution is coming from a plain unicast Observable or a Subject.

Internally to the Subject, subscribe does not invoke a new execution that delivers values. It simply registers the given Observer in a list of Observers, similarly to how addListener usually works in other libraries and languages.

Every Subject is an Observer. It is an object with the methods next(v), error(e), and complete(). To feed a new value to the Subject, just call next(theValue), and it will be multicasted to the Observers registered to listen to the Subject.

In the example below, we have two Observers attached to a Subject, and we feed some values to the Subject:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

With the following output on the console:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

Since a Subject is an Observer, this also means you may provide a Subject as the argument to the subscribe of any Observable, like the example below shows:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

Which executes as:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

With the approach above, we essentially just converted a unicast Observable execution to multicast, through the Subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers.

There are also a few specializations of the Subject type: BehaviorSubject, ReplaySubject, and AsyncSubject.

Multicasted Observables

A "multicasted Observable" passes notifications through a Subject which may have many subscribers, whereas a plain "unicast Observable" only sends notifications to a single Observer.

A multicasted Observable uses a Subject under the hood to make multiple Observers see the same Observable execution.

Under the hood, this is how the multicast operator works: Observers subscribe to an underlying Subject, and the Subject subscribes to the source Observable. The following example is similar to the previous example which used observable.subscribe(subject):

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast returns an Observable that looks like a normal Observable, but works like a Subject when it comes to subscribing. multicast returns a ConnectableObservable, which is simply an Observable with the connect() method.

The connect() method is important to determine exactly when the shared Observable execution will start. Because connect() does source.subscribe(subject) under the hood, connect() returns a Subscription, which you can unsubscribe from in order to cancel the shared Observable execution.

Reference counting

Calling connect() manually and handling the Subscription is often cumbersome. Usually, we want to automatically connect when the first Observer arrives, and automatically cancel the shared execution when the last Observer unsubscribes.

Consider the following example where subscriptions occur as outlined by this list:

  1. First Observer subscribes to the multicasted Observable
  2. The multicasted Observable is connected
  3. The next value 0 is delivered to the first Observer
  4. Second Observer subscribes to the multicasted Observable
  5. The next value 1 is delivered to the first Observer
  6. The next value 1 is delivered to the second Observer
  7. First Observer unsubscribes from the multicasted Observable
  8. The next value 2 is delivered to the second Observer
  9. Second Observer unsubscribes from the multicasted Observable
  10. The connection to the multicasted Observable is unsubscribed

To achieve that with explicit calls to connect(), we write the following code:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

If we wish to avoid explicit calls to connect(), we can use ConnectableObservable's refCount() method (reference counting), which returns an Observable that keeps track of how many subscribers it has. When the number of subscribers increases from 0 to 1, it will call connect() for us, which starts the shared execution. Only when the number of subscribers decreases from 1 to 0 will it be fully unsubscribed, stopping further execution.

refCount makes the multicasted Observable automatically start executing when the first subscriber arrives, and stop executing when the last subscriber leaves.

Below is an example:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

Which executes with the output:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

The refCount() method only exists on ConnectableObservable, and it returns an Observable, not another ConnectableObservable.

BehaviorSubject

One of the variants of Subjects is the BehaviorSubject, which has a notion of "the current value". It stores the latest value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the "current value" from the BehaviorSubject.

BehaviorSubjects are useful for representing "values over time". For instance, an event stream of birthdays is a Subject, but the stream of a person's age would be a BehaviorSubject.

In the following example, the BehaviorSubject is initialized with the value 0 which the first Observer receives when it subscribes. The second Observer receives the value 2 even though it subscribed after the value 2 was sent.

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

With output:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

A ReplaySubject is similar to a BehaviorSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution.

A ReplaySubject records multiple values from the Observable execution and replays them to new subscribers.

When creating a ReplaySubject, you can specify how many values to replay:

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

With output:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

You can also specify a window time in milliseconds, besides of the buffer size, to determine how old the recorded values can be. In the following example we use a large buffer size of 100, but a window time parameter of just 500 milliseconds.

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

With the following output where the second Observer gets events 3, 4 and 5 that happened in the last 500 milliseconds prior to its subscription:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

The AsyncSubject is a variant where only the last value of the Observable execution is sent to its observers, and only when the execution completes.

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

With output:

observerA: 5
observerB: 5

The AsyncSubject is similar to the last() operator, in that it waits for the complete notification in order to deliver a single value.

Operators

RxJS is mostly useful for its operators, even though the Observable is the foundation. Operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner.

What are operators?

Operators are methods on the Observable type, such as .map(...), .filter(...), .merge(...), etc. When called, they do not change the existing Observable instance. Instead, they return a new Observable, whose subscription logic is based on the first Observable.

An Operator is a function which creates a new Observable based on the current Observable. This is a pure operation: the previous Observable stays unmodified.

An Operator is essentially a pure function which takes one Observable as input and generates another Observable as output. Subscribing to the output Observable will also subscribe to the input Observable. In the following example, we create a custom operator function that multiplies each value received from the input Observable by 10:

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

Which outputs:

10
20
30
40

Notice that a subscribe to output will cause input Observable to be subscribed. We call this an "operator subscription chain".

Instance operators versus static operators

What is an instance operator? Typically when referring to operators, we assume instance operators, which are methods on Observable instances. For instance, if the operator multiplyByTen would be an official instance operator, it would look roughly like this:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {
  var input = this;
  return Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
}

Instance operators are functions that use the this keyword to infer what is the input Observable.

Notice how the input Observable is not a function argument anymore, it is assumed to be the this object. This is how we would use such instance operator:

var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();

observable.subscribe(x => console.log(x));

What is a static operator? Besides instance operators, static operators are functions attached to the Observable class directly. A static operator uses no this keyword internally, but instead relies entirely on its arguments.

Static operators are pure functions attached to the Observable class, and usually are used to create Observables from scratch.

The most common type of static operators are the so-called Creation Operators. Instead of transforming an input Observable to an output Observable, they simply take a non-Observable argument, like a number, and create a new Observable.

A typical example of a static creation operator would be the interval function. It takes a number (not an Observable) as input argument, and produces an Observable as output:

var observable = Rx.Observable.interval(1000 /* number of milliseconds */);

Another example of a creation operator is create, which we have been using extensively in previous examples. See the list of all static creation operators here.

However, static operators may be of different nature than simply creation. Some Combination Operators may be static, such as merge, combineLatest, concat, etc. These make sense as static operators because they take multiple Observables as input, not just one, for instance:

var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merged = Rx.Observable.merge(observable1, observable2);

Marble diagrams

To explain how operators work, textual descriptions are often not enough. Many operators are related to time, they may for instance delay, sample, throttle, or debounce value emissions in different ways. Diagrams are often a better tool for that. Marble Diagrams are visual representations of how operators work, and include the input Observable(s), the operator and its parameters, and the output Observable.

In a marble diagram, time flows to the right, and the diagram describes how values ("marbles") are emitted on the Observable execution.

Below you can see the anatomy of a marble diagram.

Throughout this documentation site, we extensively use marble diagrams to explain how operators work. They may be really useful in other contexts too, like on a whiteboard or even in our unit tests (as ASCII diagrams).

Choose an operator

Categories of operators

There are operators for different purposes, and they may be categorized as: creation, transformation, filtering, combination, multicasting, error handling, utility, etc. In the following list you will find all the operators organized in categories.

Creation Operators

Transformation Operators

Filtering Operators

Combination Operators

Multicasting Operators

Error Handling Operators

Utility Operators

Conditional and Boolean Operators

Mathematical and Aggregate Operators

Scheduler

What is a Scheduler? A scheduler controls when a subscription starts and when notifications are delivered. It consists of three components.

  • A Scheduler is a data structure. It knows how to store and queue tasks based on priority or other criteria.
  • A Scheduler is an execution context. It denotes where and when the task is executed (e.g. immediately, or in another callback mechanism such as setTimeout or process.nextTick, or the animation frame).
  • A Scheduler has a (virtual) clock. It provides a notion of "time" by a getter method now() on the scheduler. Tasks being scheduled on a particular scheduler will adhere only to the time denoted by that clock.

A Scheduler lets you define in what execution context will an Observable deliver notifications to its Observer.

In the example below, we take the usual simple Observable that emits values 1, 2, 3 synchronously, and use the operator observeOn to specify the async scheduler to use for delivering those values.

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
})
.observeOn(Rx.Scheduler.async);

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

Which executes with the output:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

Notice how the notifications got value... were delivered after just after subscribe, which is different to the default behavior we have seen so far. This is because observeOn(Rx.Scheduler.async) introduces a proxy Observer between Observable.create and the final Observer. Let's rename some identifiers to make that distinction obvious in the example code:

var observable = Rx.Observable.create(function (proxyObserver) {
  proxyObserver.next(1);
  proxyObserver.next(2);
  proxyObserver.next(3);
  proxyObserver.complete();
})
.observeOn(Rx.Scheduler.async);

var finalObserver = {
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

The proxyObserver is created in observeOn(Rx.Scheduler.async), and its next(val) function is approximately the following:

var proxyObserver = {
  next: (val) => {
    Rx.Scheduler.async.schedule(
      (x) => finalObserver.next(x),
      0 /* delay */,
      val /* will be the x for the function above */
    );
  },

  // ...
}

The async Scheduler operates with a setTimeout or setInterval, even if the given delay was zero. As usual, in JavaScript, setTimeout(fn, 0) is known to run the function fn earliest on the next event loop iteration. This explains why got value 1 is delivered to the finalObserver after just after subscribe happened.

The schedule() method of a Scheduler takes a delay argument, which refers to a quantity of time relative to the Scheduler's own internal clock. A Scheduler's clock need not have any relation to the actual wall-clock time. This is how temporal operators like delay operate not on actual time, but on time dictated by the Scheduler's clock. This is specially useful in testing, where a virtual time Scheduler may be used to fake wall-clock time while in reality executing scheduled tasks synchronously.

Scheduler Types

The async Scheduler is one of the built-in schedulers provided by RxJS. Each of these can be created and returned by using static properties of the Scheduler object.

Scheduler Purpose
null By not passing any scheduler, notifications are delivered synchronously and recursively. Use this for constant-time operations or tail recursive operations.
Rx.Scheduler.queue Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations.
Rx.Scheduler.asap Schedules on the micro task queue, which uses the fastest transport mechanism available, either Node.js' process.nextTick() or Web Worker MessageChannel or setTimeout or others. Use this for asynchronous conversions.
Rx.Scheduler.async Schedules work with setInterval. Use this for time-based operations.

Using Schedulers

You may have already used schedulers in your RxJS code without explicitly stating the type of schedulers to be used. This is because all Observable operators that deal with concurrency have optional schedulers. If you do not provide the scheduler, RxJS will pick a default scheduler by using the principle of least concurrency. This means that the scheduler which introduces the least amount of concurrency that satisfies the needs of the operator is chosen. For example, for operators returning an observable with a finite and small number of messages, RxJS uses no Scheduler, i.e. null or undefined. For operators returning a potentially large or infinite number of messages, queue Scheduler is used. For operators which use timers, async is used.

Because RxJS uses the least concurrency scheduler, you can pick a different scheduler if you want to introduce concurrency for performance purpose. To specify a particular scheduler, you can use those operator methods that take a scheduler, e.g., from([10, 20, 30], Rx.Scheduler.async).

Static creation operators usually take a Scheduler as argument. For instance, from(array, scheduler) lets you specify the Scheduler to use when delivering each notification converted from the array. It is usually the last argument to the operator. The following static creation operators take a Scheduler argument:

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

Use subscribeOn to schedule in what context will the subscribe() call happen. By default, a subscribe() call on an Observable will happen synchronously and immediately. However, you may delay or schedule the actual subscription to happen on a given Scheduler, using the instance operator subscribeOn(scheduler), where scheduler is an argument you provide.

Use observeOn to schedule in what context will notifications be delivered. As we saw in the examples above, instance operator observeOn(scheduler) introduces a mediator Observer between the source Observable and the destination Observer, where the mediator schedules calls to the destination Observer using your given scheduler.

Instance operators may take a Scheduler as argument.

Time-related operators like bufferTime, debounceTime, delay, auditTime, sampleTime, throttleTime, timeInterval, timeout, timeoutWith, windowTime all take a Scheduler as the last argument, and otherwise operate by default on the Rx.Scheduler.async Scheduler.

Other instance operators that take a Scheduler as argument: cache, combineLatest, concat, expand, merge, publishReplay, startWith.

Notice that both cache and publishReplay accept a Scheduler because they utilize a ReplaySubject. The constructor of a ReplaySubjects takes an optional Scheduler as the last argument because ReplaySubject may deal with time, which only makes sense in the context of a Scheduler. By default, a ReplaySubject uses the queue Scheduler to provide a clock.