Home Manual Reference Source Test Repository
import {Observable} from '@reactivex/rxjs/es6/Observable.js'
public class | source

Observable

A representation of any set of values over any amount of time. This the most basic building block of RxJS.

Test:

Static Method Summary

Static Public Methods
public static

bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable

Converts a callback API to a function that returns an Observable.

public static

bindNodeCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable

Converts a Node.js-style callback API to a function that returns an Observable.

public static

combineLatest(observable1: Observable, observable2: Observable, project: function, scheduler: Scheduler): Observable

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

public static

concat(input1: Observable, input2: Observable, scheduler: Scheduler): Observable

Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable.

public static

create(subscribe: function(subscriber: Subscriber): TeardownLogic): Observable

Creates a new Observable that will execute the specified function when a Subscriber subscribes to it.

public static

defer(observableFactory: function(): Observable | Promise): Observable

Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.

public static

empty(scheduler: Scheduler): Observable

Creates an Observable that emits no items to the Observer and immediately emits a complete notification.

public static

forkJoin(sources: *): any

public static

from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>

Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.

public static

fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T>

Creates an Observable that emits events of a specific type coming from the given event target.

public static

fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function): void, selector: function(...args: any): T): Observable<T>

Creates an Observable from an API based on addHandler/removeHandler functions.

public static

fromPromise(promise: Promise<T>, scheduler: Scheduler): Observable<T>

Converts a Promise to an Observable.

public static

interval(period: number, scheduler: Scheduler): Observable

Creates an Observable that emits sequential numbers every specified interval of time, on a specified IScheduler.

public static

merge(observables: ...Observable, concurrent: number, scheduler: Scheduler): Observable

Creates an output Observable which concurrently emits all values from every given input Observable.

public static

Creates an Observable that emits no items to the Observer.

public static

of(values: ...T, scheduler: Scheduler): Observable<T>

Creates an Observable that emits some values you specify as arguments, immediately one after the other, and then emits a complete notification.

public static

range(start: number, count: number, scheduler: Scheduler): Observable

Creates an Observable that emits a sequence of numbers within a specified range.

public static

throw(error: any, scheduler: Scheduler): Observable

Creates an Observable that emits no items to the Observer and immediately emits an error notification.

public static

timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable

Creates an Observable that starts emitting after an initialDelay and emits ever increasing numbers after each period of time thereafter.

public static

webSocket(urlConfigOrSource: *): WebSocketSubject

public static

zip(observables: *): Observable<R>

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

Constructor Summary

Public Constructor
public

constructor(subscribe: Function)

Method Summary

Public Methods
public

An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable

public

audit(durationSelector: function(value: T): Observable | Promise): Observable<T>

Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.

public

auditTime(duration: number, scheduler: Scheduler): Observable<T>

Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

public

buffer(closingNotifier: Observable<any>): Observable<T[]>

Buffers the source Observable values until closingNotifier emits.

public

bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]>

Buffers the source Observable values until the size hits the maximum bufferSize given.

public

bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]>

Buffers the source Observable values for a specific time period.

public

bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]>

Buffers the source Observable values starting from an emission from openings and ending when the output of closingSelector emits.

public

bufferWhen(closingSelector: function(): Observable): Observable<T[]>

Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.

public

catch(selector: function): Observable

Catches errors on the observable to be handled by returning a new observable or throwing an error.

public

Converts a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest.

public

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

public

concat(other: Observable, scheduler: Scheduler): Observable

Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable.

public

Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.

public

concatMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

public

concatMapTo(innerObservable: Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable.

public

count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable

Counts the number of emissions on the source and emits that number when the source completes.

public

debounce(durationSelector: function(value: T): Observable | Promise): Observable

Emits a value from the source Observable only after a particular time span determined by another Observable has passed without another source emission.

public

debounceTime(dueTime: number, scheduler: Scheduler): Observable

Emits a value from the source Observable only after a particular time span has passed without another source emission.

public

defaultIfEmpty(defaultValue: any): Observable

Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

public

delay(delay: number | Date, scheduler: Scheduler): Observable

Delays the emission of items from the source Observable by a given timeout or until a given Date.

public

delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable

Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.

public

Converts an Observable of Notification objects into the emissions that they represent.

public

distinct(keySelector: function, flushes: Observable): Observable

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

public

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.

public

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct.

public

do(nextOrObserver: Observer | function, error: function, complete: function): Observable

Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source.

public

elementAt(index: number, defaultValue: T): Observable

Emits the single value at the specified index in a sequence of emissions from the source Observable.

public

every(predicate: function, thisArg: any): Observable

Returns an Observable that emits whether or not every item of the source satisfies the condition specified.

public

Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.

public

exhaustMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

public

expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable

Recursively projects each source value to an Observable which is merged in the output Observable.

public

filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable

Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.

public

find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T>

Emits only the first value emitted by the source Observable that meets some condition.

public

findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable

Emits only the index of the first value emitted by the source Observable that meets some condition.

public

first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R>

Emits only the first value (or the first value that meets some condition) emitted by the source Observable.

public

forEach(next: Function, PromiseCtor: PromiseConstructor): Promise

public

groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>>

Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.

public

Ignores all items emitted by the source Observable and only passes calls of complete or error.

public

If the source Observable is empty it returns an Observable that emits true, otherwise it emits false.

public

last(predicate: function): Observable

Returns an Observable that emits only the last item emitted by the source Observable.

public

letProto(func: *): Observable<R>

public

lift(operator: Operator): Observable

Creates a new Observable, with this Observable as the source, and the passed operator defined as the new observable's operator.

public

map(project: function(value: T, index: number): R, thisArg: any): Observable<R>

Applies a given project function to each value emitted by the source Observable, and emits the resulting values as an Observable.

public

mapTo(value: any): Observable

Emits the given constant value on the output Observable every time the source Observable emits a value.

public

Represents all of the notifications from the source Observable as next emissions marked with their original types within Notification objects.

public

max(optional: Function): Observable

The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.

public

merge(other: Observable, concurrent: number, scheduler: Scheduler): Observable

Creates an output Observable which concurrently emits all values from every given input Observable.

public

mergeAll(concurrent: number): Observable

Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

public

mergeMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

Projects each source value to an Observable which is merged in the output Observable.

public

mergeMapTo(innerObservable: Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

Projects each source value to the same Observable which is merged multiple times in the output Observable.

public

mergeScan(project: *, seed: *, concurrent: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

min(optional: Function): Observable<R>

The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.

public

multicast(Factory: Function | Subject, Optional: Function): Observable

Returns an Observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.

public

observeOn(scheduler: *, delay: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

Groups pairs of consecutive emissions together and emits them as an array of two values.

public

partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>]

Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate.

It's like filter, but returns two Observables: one like the output of filter, and the other with values that did not pass the condition.

public

pluck(properties: ...string): Observable

Maps each source value (an object) to its specified nested property.

public

publish(Optional: Function): *

Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it.

public
public
public

publishReplay(bufferSize: *, windowTime: *, scheduler: *): ConnectableObservable<T>

public

Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables

public

reduce(accumulator: function(acc: R, value: T): R, seed: R): Observable<R>

Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.

public

repeat(scheduler: Scheduler, count: number): Observable

Returns an Observable that repeats the stream of items emitted by the source Observable at most count times, on a particular IScheduler.

public

repeatWhen(receives: notificationHandler, the: scheduler): Observable

Returns an Observable that emits the same values as the source observable with the exception of a complete.

public

retry(number: number): Observable

Returns an Observable that mirrors the source Observable, resubscribing to it if it calls error and the predicate returns true for that specific exception and retry count.

public

retryWhen(receives: notificationHandler, the: scheduler): Observable

Returns an Observable that emits the same values as the source observable with the exception of an error.

public

sample(notifier: Observable<any>): Observable<T>

Emits the most recently emitted value from the source Observable whenever another Observable, the notifier, emits.

public

sampleTime(period: number, scheduler: Scheduler): Observable<T>

Emits the most recently emitted value from the source Observable within periodic time intervals.

public

scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R>

Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.

public

sequenceEqual(compareTo: Observable, comparor: function): Observable

Compares all values of two observables in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal.

public

Returns a new Observable that multicasts (shares) the original Observable.

public

Returns an Observable that emits the single item emitted by the source Observable that matches a specified predicate, if that Observable emits one such item.

public

Returns an Observable that skips n items emitted by an Observable.

public

Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

public

skipWhile(predicate: Function): Observable<T>

Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.

public

startWith(an: Values): Observable

Returns an Observable that emits the items in a specified Iterable before it begins to emit items emitted by the source Observable.

public

Asynchronously subscribes Observers to this Observable on the specified IScheduler.

public

Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.

public

switchMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.

public

switchMapTo(innerObservable: Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to the same Observable which is flattened multiple times with switch in the output Observable.

public

take(count: number): Observable<T>

Emits only the first count values emitted by the source Observable.

public

takeLast(count: number): Observable<T>

Emits only the last count values emitted by the source Observable.

public

Emits the values emitted by the source Observable until a notifier Observable emits a value.

public

takeWhile(predicate: function(value: T, index: number): boolean): Observable<T>

Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.

public

throttle(durationSelector: function(value: T): Observable | Promise): Observable<T>

Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.

public

throttleTime(duration: number, scheduler: Scheduler): Observable<T>

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

public

timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T>

public

timeout(due: number, scheduler: Scheduler): Observable<R> | WebSocketSubject<T> | Observable<T>

public

timeoutWith(due: *, withObservable: *, scheduler: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T>

public
public

toPromise(PromiseCtor: *): Promise<T>

public

window(windowBoundaries: Observable<any>): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable whenever windowBoundaries emits.

public

windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable with each nested Observable emitting at most windowSize values.

public

windowTime(windowTimeSpan: number, windowCreationInterval: number, scheduler: Scheduler): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable periodically in time.

public

windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable starting from an emission from openings and ending when the output of closingSelector emits.

public

windowWhen(closingSelector: function(): Observable): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.

public

Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.

public

zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

zipProto(observables: *): Observable<R>

Static Public Methods

public static bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable source

Converts a callback API to a function that returns an Observable.

Give it a function f of type f(x, callback) and it will return a function g that when called as g(x) will output an Observable.

bindCallback is not an operator because its input and output are not Observables. The input is a function func with some parameters, but the last parameter must be a callback function that func calls when it is done. The output of bindCallback is a function that takes the same parameters as func, except the last one (the callback). When the output function is called with arguments, it will return an Observable where the results will be delivered to.

Params:

NameTypeAttributeDescription
func function

Function with a callback as the last parameter.

selector function
  • optional

A function which takes the arguments from the callback and maps those a value to emit on the output Observable.

scheduler Scheduler
  • optional

The scheduler on which to schedule the callbacks.

Return:

function(...params: *): Observable

A function which returns the Observable that delivers the same values the callback would deliver.

Example:

Convert jQuery's getJSON to an Observable API
// Suppose we have jQuery.getJSON('/my/url', callback)
var getJSONAsObservable = Rx.Observable.bindCallback(jQuery.getJSON);
var result = getJSONAsObservable('/my/url');
result.subscribe(x => console.log(x), e => console.error(e));

Test:

See:

public static bindNodeCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable source

Converts a Node.js-style callback API to a function that returns an Observable.

It's just like bindCallback, but the callback is expected to be of type callback(error, result).

bindNodeCallback is not an operator because its input and output are not Observables. The input is a function func with some parameters, but the last parameter must be a callback function that func calls when it is done. The callback function is expected to follow Node.js conventions, where the first argument to the callback is an error, while remaining arguments are the callback result. The output of bindNodeCallback is a function that takes the same parameters as func, except the last one (the callback). When the output function is called with arguments, it will return an Observable where the results will be delivered to.

Params:

NameTypeAttributeDescription
func function

Function with a callback as the last parameter.

selector function
  • optional

A function which takes the arguments from the callback and maps those a value to emit on the output Observable.

scheduler Scheduler
  • optional

The scheduler on which to schedule the callbacks.

Return:

function(...params: *): Observable

A function which returns the Observable that delivers the same values the Node.js callback would deliver.

Example:

Read a file from the filesystem and get the data as an Observable
import * as fs from 'fs';
var readFileAsObservable = Rx.Observable.bindNodeCallback(fs.readFile);
var result = readFileAsObservable('./roadNames.txt', 'utf8');
result.subscribe(x => console.log(x), e => console.error(e));

Test:

See:

public static combineLatest(observable1: Observable, observable2: Observable, project: function, scheduler: Scheduler): Observable source

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.

combineLatest combines the values from all the Observables passed as arguments. This is done by subscribing to each Observable, in order, and collecting an array of each of the most recent values any time any of the input Observables emits, then either taking that array and passing it as arguments to an optional project function and emitting the return value of that, or just emitting the array of recent values directly if there is no project function.

Params:

NameTypeAttributeDescription
observable1 Observable

An input Observable to combine with the source Observable.

observable2 Observable

An input Observable to combine with the source Observable. More than one input Observables may be given as argument.

project function
  • optional

An optional function to project the values from the combined latest values into a new value on the output Observable.

scheduler Scheduler
  • optional
  • default: null

The IScheduler to use for subscribing to each input Observable.

Return:

Observable

An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable.

Example:

Dynamically calculate the Body-Mass Index from an Observable of weight and one for height
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = Rx.Observable.combineLatest(weight, height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));

// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222

Test:

See:

public static concat(input1: Observable, input2: Observable, scheduler: Scheduler): Observable source

Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable.

Concatenates multiple Observables together by sequentially emitting their values, one Observable after the other.

Joins multiple Observables together by subscribing to them one at a time and merging their results into the output Observable. Will wait for each Observable to complete before moving on to the next.

Params:

NameTypeAttributeDescription
input1 Observable

An input Observable to concatenate with others.

input2 Observable

An input Observable to concatenate with others. More than one input Observables may be given as argument.

scheduler Scheduler
  • optional
  • default: null

An optional IScheduler to schedule each Observable subscription on.

Return:

Observable

All values of each passed Observable merged into a single Observable, in order, in serial fashion.

Example:

Concatenate a timer counting from 0 to 3 with a synchronous sequence from 1 to 10
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = Rx.Observable.concat(timer, sequence);
result.subscribe(x => console.log(x));

// results in:
// 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
Concatenate 3 Observables
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = Rx.Observable.concat(timer1, timer2, timer3);
result.subscribe(x => console.log(x));

// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9

See:

public static create(subscribe: function(subscriber: Subscriber): TeardownLogic): Observable source

Creates a new Observable that will execute the specified function when a Subscriber subscribes to it.

Creates an Observable with custom logic given in the subscribe function.

create converts a subscribe function to an actual Observable. This is equivalent to calling the Observable constructor. Write the subscribe function so that it behaves as an Observable: It should invoke the Subscriber's next, error, and complete methods following the Observable Contract. A well-formed Observable must invoke either the Subscriber's complete method exactly once or its error method exactly once, and invoke nothing else thereafter.

Most of the times you should not need to use create because existing creation operators (together with instance combination operators) allow you to create an Observable for most of the use cases. However, create is low-level and is able to create any Observable.

Params:

NameTypeAttributeDescription
subscribe function(subscriber: Subscriber): TeardownLogic
  • optional

A function that accepts a Subscriber, and invokes its next, error, and complete methods as appropriate, and should return some logic for tear down, either as a Subscription or as a function.

Return:

Observable

An Observable that, when subscribed, will execute the specified function.

Example:

Emit three random numbers, then complete.
var result = Rx.Observable.create(function (subscriber) {
  subscriber.next(Math.random());
  subscriber.next(Math.random());
  subscriber.next(Math.random());
  subscriber.complete();
});
result.subscribe(x => console.log(x));

See:

public static defer(observableFactory: function(): Observable | Promise): Observable source

Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.

Creates the Observable lazily, that is, only when it is subscribed.

defer allows you to create the Observable only when the Observer subscribes, and create a fresh Observable for each Observer. It waits until an Observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual Observable.

Params:

NameTypeAttributeDescription
observableFactory function(): Observable | Promise

The Observable factory function to invoke for each Observer that subscribes to the output Observable. May also return a Promise, which will be converted on the fly to an Observable.

Return:

Observable

An Observable whose Observers' subscriptions trigger an invocation of the given Observable factory function.

Example:

Subscribe to either an Observable of clicks or an Observable of interval, at random
var clicksOrInterval = Rx.Observable.defer(function () {
  if (Math.random() > 0.5) {
    return Rx.Observable.fromEvent(document, 'click');
  } else {
    return Rx.Observable.interval(1000);
  }
});
clicksOrInterval.subscribe(x => console.log(x));

// Results in the following behavior:
// If the result of Math.random() is greater than 0.5 it will listen
// for clicks anywhere on the "document"; when document is clicked it
// will log a MouseEvent object to the console. If the result is less
// than 0.5 it will emit ascending numbers, one every second(1000ms).

Test:

See:

public static empty(scheduler: Scheduler): Observable source

Creates an Observable that emits no items to the Observer and immediately emits a complete notification.

Just emits 'complete', and nothing else.

This static operator is useful for creating a simple Observable that only emits the complete notification. It can be used for composing with other Observables, such as in a mergeMap.

Params:

NameTypeAttributeDescription
scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emission of the complete notification.

Return:

Observable

An "empty" Observable: emits only the complete notification.

Example:

Emit the number 7, then complete.
var result = Rx.Observable.empty().startWith(7);
result.subscribe(x => console.log(x));
Map and flatten only odd numbers to the sequence 'a', 'b', 'c'
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
  x % 2 === 1 ? Rx.Observable.of('a', 'b', 'c') : Rx.Observable.empty()
);
result.subscribe(x => console.log(x));

// Results in the following to the console:
// x is equal to the count on the interval eg(0,1,2,3,...)
// x will occur every 1000ms
// if x % 2 is equal to 1 print abc
// if x % 2 is not equal to 1 nothing will be output

Test:

See:

public static forkJoin(sources: *): any source

Params:

NameTypeAttributeDescription
sources *

Return:

any

Test:

public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T> source

Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.

Converts almost anything to an Observable.

Convert various other objects and data types into Observables. from converts a Promise or an array-like or an iterable object into an Observable that emits the items in that promise or array or iterable. A String, in this context, is treated as an array of characters. Observable-like objects (contains a function named with the ES2015 Symbol for Observable) can also be converted through this operator.

Params:

NameTypeAttributeDescription
ish ObservableInput<T>

A subscribable object, a Promise, an Observable-like, an Array, an iterable or an array-like object to be converted.

scheduler Scheduler
  • optional

The scheduler on which to schedule the emissions of values.

Return:

Observable<T>

The Observable whose values are originally from the input object that was converted.

Example:

Converts an array to an Observable
var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));

// Results in the following:
// 10 20 30
Convert an infinite iterable (from a generator) to an Observable
function* generateDoubles(seed) {
  var i = seed;
  while (true) {
    yield i;
    i = 2 * i; // double it
  }
}

var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator).take(10);
result.subscribe(x => console.log(x));

// Results in the following:
// 3 6 12 24 48 96 192 384 768 1536

Test:

See:

public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T> source

Creates an Observable that emits events of a specific type coming from the given event target.

Creates an Observable from DOM events, or Node EventEmitter events or others.

Creates an Observable by attaching an event listener to an "event target", which may be an object with addEventListener and removeEventListener, a Node.js EventEmitter, a jQuery style EventEmitter, a NodeList from the DOM, or an HTMLCollection from the DOM. The event handler is attached when the output Observable is subscribed, and removed when the Subscription is unsubscribed.

Params:

NameTypeAttributeDescription
target EventTargetLike

The DOMElement, event target, Node.js EventEmitter, NodeList or HTMLCollection to attach the event handler to.

eventName string

The event name of interest, being emitted by the target.

options EventListenerOptions
  • optional

Options to pass through to addEventListener

selector SelectorMethodSignature<T>
  • optional

An optional function to post-process results. It takes the arguments from the event handler and should return a single value.

Return:

Observable<T>

Example:

Emits clicks happening on the DOM document
var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));

// Results in:
// MouseEvent object logged to console everytime a click
// occurs on the document.

Test:

See:

public static fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function): void, selector: function(...args: any): T): Observable<T> source

Creates an Observable from an API based on addHandler/removeHandler functions.

Converts any addHandler/removeHandler API to an Observable.

Creates an Observable by using the addHandler and removeHandler functions to add and remove the handlers, with an optional selector function to project the event arguments to a result. The addHandler is called when the output Observable is subscribed, and removeHandler is called when the Subscription is unsubscribed.

Params:

NameTypeAttributeDescription
addHandler function(handler: Function): any

A function that takes a handler function as argument and attaches it somehow to the actual source of events.

removeHandler function(handler: Function): void

A function that takes a handler function as argument and removes it in case it was previously attached using addHandler.

selector function(...args: any): T
  • optional

An optional function to post-process results. It takes the arguments from the event handler and should return a single value.

Return:

Observable<T>

Example:

Emits clicks happening on the DOM document
function addClickHandler(handler) {
  document.addEventListener('click', handler);
}

function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}

var clicks = Rx.Observable.fromEventPattern(
  addClickHandler,
  removeClickHandler
);
clicks.subscribe(x => console.log(x));

Test:

See:

public static fromPromise(promise: Promise<T>, scheduler: Scheduler): Observable<T> source

Converts a Promise to an Observable.

Returns an Observable that just emits the Promise's resolved value, then completes.

Converts an ES2015 Promise or a Promises/A+ spec compliant Promise to an Observable. If the Promise resolves with a value, the output Observable emits that resolved value as a next, and then completes. If the Promise is rejected, then the output Observable emits the corresponding Error.

Params:

NameTypeAttributeDescription
promise Promise<T>

The promise to be converted.

scheduler Scheduler
  • optional

An optional IScheduler to use for scheduling the delivery of the resolved value (or the rejection).

Return:

Observable<T>

An Observable which wraps the Promise.

Example:

Convert the Promise returned by Fetch to an Observable
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));

Test:

See:

public static interval(period: number, scheduler: Scheduler): Observable source

Creates an Observable that emits sequential numbers every specified interval of time, on a specified IScheduler.

Emits incremental numbers periodically in time.

interval returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between those emissions. The first emission is not sent immediately, but only after the first period has passed. By default, this operator uses the async IScheduler to provide a notion of time, but you may pass any IScheduler to it.

Params:

NameTypeAttributeDescription
period number
  • optional
  • default: 0

The interval size in milliseconds (by default) or the time unit determined by the scheduler's clock.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for scheduling the emission of values, and providing a notion of "time".

Return:

Observable

An Observable that emits a sequential number each time interval.

Example:

Emits ascending numbers, one every second (1000ms)
var numbers = Rx.Observable.interval(1000);
numbers.subscribe(x => console.log(x));

Test:

See:

public static merge(observables: ...Observable, concurrent: number, scheduler: Scheduler): Observable source

Creates an output Observable which concurrently emits all values from every given input Observable.

Flattens multiple Observables together by blending their values into one Observable.

merge subscribes to each given input Observable (as arguments), and simply forwards (without doing any transformation) all the values from all the input Observables to the output Observable. The output Observable only completes once all input Observables have completed. Any error delivered by an input Observable will be immediately emitted on the output Observable.

Params:

NameTypeAttributeDescription
observables ...Observable

Input Observables to merge together.

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

scheduler Scheduler
  • optional
  • default: null

The IScheduler to use for managing concurrency of input Observables.

Return:

Observable

an Observable that emits items that are the result of every input Observable.

Example:

Merge together two Observables: 1s interval and clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = Rx.Observable.merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));

// Results in the following:
// timer will emit ascending values, one every second(1000ms) to console
// clicks logs MouseEvents to console everytime the "document" is clicked
// Since the two streams are merged you see these happening
// as they occur.
Merge together 3 Observables, but only 2 run concurrently
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));

// Results in the following:
// - First timer1 and timer2 will run concurrently
// - timer1 will emit a value every 1000ms for 10 iterations
// - timer2 will emit a value every 2000ms for 6 iterations
// - after timer1 hits it's max iteration, timer2 will
//   continue, and timer3 will start to run concurrently with timer2
// - when timer2 hits it's max iteration it terminates, and
//   timer3 will continue to emit a value every 500ms until it is complete

See:

public static never(): Observable source

Creates an Observable that emits no items to the Observer.

An Observable that never emits anything.

This static operator is useful for creating a simple Observable that emits neither values nor errors nor the completion notification. It can be used for testing purposes or for composing with other Observables. Please not that by never emitting a complete notification, this Observable keeps the subscription from being disposed automatically. Subscriptions need to be manually disposed.

Return:

Observable

A "never" Observable: never emits anything.

Example:

Emit the number 7, then never emit anything else (not even complete).
function info() {
  console.log('Will not be called');
}
var result = Rx.Observable.never().startWith(7);
result.subscribe(x => console.log(x), info, info);

Test:

See:

public static of(values: ...T, scheduler: Scheduler): Observable<T> source

Creates an Observable that emits some values you specify as arguments, immediately one after the other, and then emits a complete notification.

Emits the arguments you provide, then completes.

This static operator is useful for creating a simple Observable that only emits the arguments given, and the complete notification thereafter. It can be used for composing with other Observables, such as with concat. By default, it uses a null IScheduler, which means the next notifications are sent synchronously, although with a different IScheduler it is possible to determine when those notifications will be delivered.

Params:

NameTypeAttributeDescription
values ...T

Arguments that represent next values to be emitted.

scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emissions of the next notifications.

Return:

Observable<T>

An Observable that emits each given input value.

Example:

Emit 10, 20, 30, then 'a', 'b', 'c', then start ticking every second.
var numbers = Rx.Observable.of(10, 20, 30);
var letters = Rx.Observable.of('a', 'b', 'c');
var interval = Rx.Observable.interval(1000);
var result = numbers.concat(letters).concat(interval);
result.subscribe(x => console.log(x));

Test:

See:

public static range(start: number, count: number, scheduler: Scheduler): Observable source

Creates an Observable that emits a sequence of numbers within a specified range.

Emits a sequence of numbers in a range.

range operator emits a range of sequential integers, in order, where you select the start of the range and its length. By default, uses no IScheduler and just delivers the notifications synchronously, but may use an optional IScheduler to regulate those deliveries.

Params:

NameTypeAttributeDescription
start number
  • optional
  • default: 0

The value of the first integer in the sequence.

count number
  • optional
  • default: 0

The number of sequential integers to generate.

scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emissions of the notifications.

Return:

Observable

An Observable of numbers that emits a finite range of sequential integers.

Example:

Emits the numbers 1 to 10
var numbers = Rx.Observable.range(1, 10);
numbers.subscribe(x => console.log(x));

Test:

See:

public static throw(error: any, scheduler: Scheduler): Observable source

Creates an Observable that emits no items to the Observer and immediately emits an error notification.

Just emits 'error', and nothing else.

This static operator is useful for creating a simple Observable that only emits the error notification. It can be used for composing with other Observables, such as in a mergeMap.

Params:

NameTypeAttributeDescription
error any

The particular Error to pass to the error notification.

scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emission of the error notification.

Return:

Observable

An error Observable: emits only the error notification using the given error argument.

Example:

Emit the number 7, then emit an error.
var result = Rx.Observable.throw(new Error('oops!')).startWith(7);
result.subscribe(x => console.log(x), e => console.error(e));
Map and flattens numbers to the sequence 'a', 'b', 'c', but throw an error for 13
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
  x === 13 ?
    Rx.Observable.throw('Thirteens are bad') :
    Rx.Observable.of('a', 'b', 'c')
);
result.subscribe(x => console.log(x), e => console.error(e));

Test:

See:

public static timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable source

Creates an Observable that starts emitting after an initialDelay and emits ever increasing numbers after each period of time thereafter.

Its like interval, but you can specify when should the emissions start.

timer returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time, period of your choosing between those emissions. The first emission happens after the specified initialDelay. The initial delay may be a Date. By default, this operator uses the async IScheduler to provide a notion of time, but you may pass any IScheduler to it. If period is not specified, the output Observable emits only one value, 0. Otherwise, it emits an infinite sequence.

Params:

NameTypeAttributeDescription
initialDelay number | Date

The initial delay time to wait before emitting the first value of 0.

period number
  • optional

The period of time between emissions of the subsequent numbers.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for scheduling the emission of values, and providing a notion of "time".

Return:

Observable

An Observable that emits a 0 after the initialDelay and ever increasing numbers after each period of time thereafter.

Example:

Emits ascending numbers, one every second (1000ms), starting after 3 seconds
var numbers = Rx.Observable.timer(3000, 1000);
numbers.subscribe(x => console.log(x));
Emits one number after five seconds
var numbers = Rx.Observable.timer(5000);
numbers.subscribe(x => console.log(x));

Test:

See:

public static webSocket(urlConfigOrSource: *): WebSocketSubject source

Params:

NameTypeAttributeDescription
urlConfigOrSource *

Test:

public static zip(observables: *): Observable<R> source

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

If the latest parameter is a function, this function is used to compute the created value from the input values. Otherwise, an array of the input values is returned.

Params:

NameTypeAttributeDescription
observables *

Return:

Observable<R>

Example:

Combine age and name from different sources

let age$ = Observable.of<number>(27, 25, 29);
let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of<boolean>(true, true, false);

Observable
    .zip(age$,
         name$,
         isDev$,
         (age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
    .subscribe(x => console.log(x));

// outputs
// { age: 7, name: 'Foo', isDev: true }
// { age: 5, name: 'Bar', isDev: true }
// { age: 9, name: 'Beer', isDev: false }

Test:

Public Constructors

public constructor(subscribe: Function) source

Params:

NameTypeAttributeDescription
subscribe Function

the function that is called when the Observable is initially subscribed to. This function is given a Subscriber, to which new values can be nexted, or an error method can be called to raise an error, or complete can be called to notify of a successful completion.

Public Methods

public [$$observable](): Observable source

An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable

Return:

Observable

this instance of the observable

public audit(durationSelector: function(value: T): Observable | Promise): Observable<T> source

Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.

It's like auditTime, but the silencing duration is determined by a second Observable.

audit is similar to throttle, but emits the last value from the silenced time window, instead of the first value. audit emits the most recent value from the source Observable on the output Observable as soon as its internal timer becomes disabled, and ignores source values while the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, the timer is enabled by calling the durationSelector function with the source value, which returns the "duration" Observable. When the duration Observable emits a value or completes, the timer is disabled, then the most recent source value is emitted on the output Observable, and this process repeats for the next source value.

Params:

NameTypeAttributeDescription
durationSelector function(value: T): Observable | Promise

A function that receives a value from the source Observable, for computing the silencing duration, returned as an Observable or a Promise.

Return:

Observable<T>

An Observable that performs rate-limiting of emissions from the source Observable.

Example:

Emit clicks at a rate of at most one click per second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.audit(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public auditTime(duration: number, scheduler: Scheduler): Observable<T> source

Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

When it sees a source values, it ignores that plus the next ones for duration milliseconds, and then it emits the most recent value from the source.

auditTime is similar to throttleTime, but emits the last value from the silenced time window, instead of the first value. auditTime emits the most recent value from the source Observable on the output Observable as soon as its internal timer becomes disabled, and ignores source values while the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, the timer is enabled. After duration milliseconds (or the time unit determined internally by the optional scheduler) has passed, the timer is disabled, then the most recent source value is emitted on the output Observable, and this process repeats for the next source value. Optionally takes a IScheduler for managing timers.

Params:

NameTypeAttributeDescription
duration number

Time to wait before emitting the most recent source value, measured in milliseconds or the time unit determined internally by the optional scheduler.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the rate-limiting behavior.

Return:

Observable<T>

An Observable that performs rate-limiting of emissions from the source Observable.

Example:

Emit clicks at a rate of at most one click per second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.auditTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public buffer(closingNotifier: Observable<any>): Observable<T[]> source

Buffers the source Observable values until closingNotifier emits.

Collects values from the past as an array, and emits that array only when another Observable emits.

Buffers the incoming Observable values until the given closingNotifier Observable emits a value, at which point it emits the buffer on the output Observable and starts a new buffer internally, awaiting the next time closingNotifier emits.

Params:

NameTypeAttributeDescription
closingNotifier Observable<any>

An Observable that signals the buffer to be emitted on the output Observable.

Return:

Observable<T[]>

An Observable of buffers, which are arrays of values.

Example:

On every click, emit array of most recent interval events
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var buffered = interval.buffer(clicks);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> source

Buffers the source Observable values until the size hits the maximum bufferSize given.

Collects values from the past as an array, and emits that array only when its size reaches bufferSize.

Buffers a number of values from the source Observable by bufferSize then emits the buffer and clears it, and starts a new buffer each startBufferEvery values. If startBufferEvery is not provided or is null, then new buffers are started immediately at the start of the source and when each buffer closes and is emitted.

Params:

NameTypeAttributeDescription
bufferSize number

The maximum size of the buffer emitted.

startBufferEvery number
  • optional

Interval at which to start a new buffer. For example if startBufferEvery is 2, then a new buffer will be started on every other value from the source. A new buffer is started at the beginning of the source by default.

Return:

Observable<T[]>

An Observable of arrays of buffered values.

Example:

Emit the last two click events as an array
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));
On every click, emit the last two click events as an array
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2, 1);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> source

Buffers the source Observable values for a specific time period.

Collects values from the past as an array, and emits those arrays periodically in time.

Buffers values from the source for a specific time duration bufferTimeSpan. Unless the optional argument bufferCreationInterval is given, it emits and resets the buffer every bufferTimeSpan milliseconds. If bufferCreationInterval is given, this operator opens the buffer every bufferCreationInterval milliseconds and closes (emits and resets) the buffer every bufferTimeSpan milliseconds. When the optional argument maxBufferSize is specified, the buffer will be closed either after bufferTimeSpan milliseconds or when it contains maxBufferSize elements.

Params:

NameTypeAttributeDescription
bufferTimeSpan number

The amount of time to fill each buffer array.

bufferCreationInterval number
  • optional

The interval at which to start new buffers.

maxBufferSize number
  • optional

The maximum buffer size.

scheduler Scheduler
  • optional
  • default: async

The scheduler on which to schedule the intervals that determine buffer boundaries.

Return:

Observable<T[]>

An observable of arrays of buffered values.

Example:

Every second, emit an array of the recent click events
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));
Every 5 seconds, emit the click events from the next 2 seconds
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(2000, 5000);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> source

Buffers the source Observable values starting from an emission from openings and ending when the output of closingSelector emits.

Collects values from the past as an array. Starts collecting only when opening emits, and calls the closingSelector function to get an Observable that tells when to close the buffer.

Buffers values from the source by opening the buffer via signals from an Observable provided to openings, and closing and sending the buffers when a Subscribable or Promise returned by the closingSelector function emits.

Params:

NameTypeAttributeDescription
openings SubscribableOrPromise<O>

A Subscribable or Promise of notifications to start new buffers.

closingSelector function(value: O): SubscribableOrPromise

A function that takes the value emitted by the openings observable and returns a Subscribable or Promise, which, when it emits, signals that the associated buffer should be emitted and cleared.

Return:

Observable<T[]>

An observable of arrays of buffered values.

Example:

Every other second, emit the click events from the next 500ms
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var buffered = clicks.bufferToggle(openings, i =>
  i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferWhen(closingSelector: function(): Observable): Observable<T[]> source

Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.

Collects values from the past as an array. When it starts collecting values, it calls a function that returns an Observable that tells when to close the buffer and restart collecting.

Opens a buffer immediately, then closes the buffer when the observable returned by calling closingSelector function emits a value. When it closes the buffer, it immediately opens a new buffer and repeats the process.

Params:

NameTypeAttributeDescription
closingSelector function(): Observable

A function that takes no arguments and returns an Observable that signals buffer closure.

Return:

Observable<T[]>

An observable of arrays of buffered values.

Example:

Emit an array of the last clicks every [1-5] random seconds
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferWhen(() =>
  Rx.Observable.interval(1000 + Math.random() * 4000)
);
buffered.subscribe(x => console.log(x));

Test:

See:

public catch(selector: function): Observable source

Catches errors on the observable to be handled by returning a new observable or throwing an error.

Params:

NameTypeAttributeDescription
selector function

a function that takes as arguments err, which is the error, and caught, which is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable is returned by the selector will be used to continue the observable chain.

Return:

Observable

an observable that originates from either the source or the observable returned by the catch selector function.

Test:

public combineAll(project: function): Observable source

Converts a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest.

Flattens an Observable-of-Observables by applying combineLatest when the Observable-of-Observables completes.

Takes an Observable of Observables, and collects all Observables from it. Once the outer Observable completes, it subscribes to all collected Observables and combines their values using the combineLatest strategy, such that:

  • Every time an inner Observable emits, the output Observable emits.
  • When the returned observable emits, it emits all of the latest values by:
    • If a project function is provided, it is called with each recent value from each inner Observable in whatever order they arrived, and the result of the project function is what is emitted by the output Observable.
    • If there is no project function, an array of all of the most recent values is emitted by the output Observable.

Params:

NameTypeAttributeDescription
project function
  • optional

An optional function to map the most recent values from each inner Observable into a new result. Takes each of the most recent values from each collected inner Observable as arguments, in order.

Return:

Observable

An Observable of projected results or arrays of recent values.

Example:

Map two click events to a finite interval Observable, then apply combineAll
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev =>
  Rx.Observable.interval(Math.random()*2000).take(3)
).take(2);
var result = higherOrder.combineAll();
result.subscribe(x => console.log(x));

Test:

See:

public combineLatest(other: Observable, project: function): Observable source

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.

combineLatest combines the values from this Observable with values from Observables passed as arguments. This is done by subscribing to each Observable, in order, and collecting an array of each of the most recent values any time any of the input Observables emits, then either taking that array and passing it as arguments to an optional project function and emitting the return value of that, or just emitting the array of recent values directly if there is no project function.

Params:

NameTypeAttributeDescription
other Observable

An input Observable to combine with the source Observable. More than one input Observables may be given as argument.

project function
  • optional

An optional function to project the values from the combined latest values into a new value on the output Observable.

Return:

Observable

An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable.

Example:

Dynamically calculate the Body-Mass Index from an Observable of weight and one for height
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = weight.combineLatest(height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));

// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222

See:

public concat(other: Observable, scheduler: Scheduler): Observable source

Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable.

Concatenates multiple Observables together by sequentially emitting their values, one Observable after the other.

Joins this Observable with multiple other Observables by subscribing to them one at a time, starting with the source, and merging their results into the output Observable. Will wait for each Observable to complete before moving on to the next.

Params:

NameTypeAttributeDescription
other Observable

An input Observable to concatenate after the source Observable. More than one input Observables may be given as argument.

scheduler Scheduler
  • optional
  • default: null

An optional IScheduler to schedule each Observable subscription on.

Return:

Observable

All values of each passed Observable merged into a single Observable, in order, in serial fashion.

Example:

Concatenate a timer counting from 0 to 3 with a synchronous sequence from 1 to 10
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = timer.concat(sequence);
result.subscribe(x => console.log(x));

// results in:
// 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
Concatenate 3 Observables
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = timer1.concat(timer2, timer3);
result.subscribe(x => console.log(x));

// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9

Test:

See:

public concatAll(): Observable source

Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.

Flattens an Observable-of-Observables by putting one inner Observable after the other.

Joins every Observable emitted by the source (a higher-order Observable), in a serial fashion. It subscribes to each inner Observable only after the previous inner Observable has completed, and merges all of their values into the returned observable.

Warning: If the source Observable emits Observables quickly and endlessly, and the inner Observables it emits generally complete slower than the source emits, you can run into memory issues as the incoming Observables collect in an unbounded buffer.

Note: concatAll is equivalent to mergeAll with concurrency parameter set to 1.

Return:

Observable

An Observable emitting values from all the inner Observables concatenated.

Example:

For each click event, tick every second from 0 to 3, with no concurrency
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(4));
var firstOrder = higherOrder.concatAll();
firstOrder.subscribe(x => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public concatMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

Maps each value to an Observable, then flattens all of these inner Observables using concatAll.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an (so-called "inner") Observable. Each new inner Observable is concatenated with the previous inner Observable.

Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.

Note: concatMap is equivalent to mergeMap with concurrency parameter set to 1.

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): Observable

A function that, when applied to an item emitted by the source Observable, returns an Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An Observable that emits the result of applying the projection function (and the optional resultSelector) to each item emitted by the source Observable and taking values from each projected inner Observable sequentially.

Example:

For each click event, tick every second from 0 to 3, with no concurrency
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public concatMapTo(innerObservable: Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable.

It's like concatMap, but maps each value always to the same inner Observable.

Maps each source value to the given Observable innerObservable regardless of the source value, and then flattens those resulting Observables into one single Observable, which is the output Observable. Each new innerObservable instance emitted on the output Observable is concatenated with the previous innerObservable instance.

Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.

Note: concatMapTo is equivalent to mergeMapTo with concurrency parameter set to 1.

Params:

NameTypeAttributeDescription
innerObservable Observable

An Observable to replace each value from the source Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An observable of values merged together by joining the passed observable with itself, one after the other, for each value emitted from the source.

Example:

For each click event, tick every second from 0 to 3, with no concurrency
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable source

Counts the number of emissions on the source and emits that number when the source completes.

Tells how many values were emitted, when the source completes.

count transforms an Observable that emits values into an Observable that emits a single value that represents the number of values emitted by the source Observable. If the source Observable terminates with an error, count will pass this error notification along without emitting an value first. If the source Observable does not terminate at all, count will neither emit a value nor terminate. This operator takes an optional predicate function as argument, in which case the output emission will represent the number of source values that matched true with the predicate.

Params:

NameTypeAttributeDescription
predicate function(value: T, i: number, source: Observable<T>): boolean
  • optional

A boolean function to select what values are to be counted. It is provided with arguments of:

  • value: the value from the source Observable.
  • index: the (zero-based) "index" of the value from the source Observable.
  • source: the source Observable instance itself.

Return:

Observable

An Observable of one number that represents the count as described above.

Example:

Counts how many seconds have passed before the first click happened
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var secondsBeforeClick = seconds.takeUntil(clicks);
var result = secondsBeforeClick.count();
result.subscribe(x => console.log(x));
Counts how many odd numbers are there between 1 and 7
var numbers = Rx.Observable.range(1, 7);
var result = numbers.count(i => i % 2 === 1);
result.subscribe(x => console.log(x));

// Results in:
// 4

Test:

See:

public debounce(durationSelector: function(value: T): Observable | Promise): Observable source

Emits a value from the source Observable only after a particular time span determined by another Observable has passed without another source emission.

It's like debounceTime, but the time span of emission silence is determined by a second Observable.

debounce delays values emitted by the source Observable, but drops previous pending delayed emissions if a new value arrives on the source Observable. This operator keeps track of the most recent value from the source Observable, and spawns a duration Observable by calling the durationSelector function. The value is emitted only when the duration Observable emits a value or completes, and if no other value was emitted on the source Observable since the duration Observable was spawned. If a new value appears before the duration Observable emits, the previous value will be dropped and will not be emitted on the output Observable.

Like debounceTime, this is a rate-limiting operator, and also a delay-like operator since output emissions do not necessarily occur at the same time as they did on the source Observable.

Params:

NameTypeAttributeDescription
durationSelector function(value: T): Observable | Promise

A function that receives a value from the source Observable, for computing the timeout duration for each source value, returned as an Observable or a Promise.

Return:

Observable

An Observable that delays the emissions of the source Observable by the specified duration Observable returned by durationSelector, and may drop some values if they occur too frequently.

Example:

Emit the most recent click after a burst of clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public debounceTime(dueTime: number, scheduler: Scheduler): Observable source

Emits a value from the source Observable only after a particular time span has passed without another source emission.

It's like delay, but passes only the most recent value from each burst of emissions.

debounceTime delays values emitted by the source Observable, but drops previous pending delayed emissions if a new value arrives on the source Observable. This operator keeps track of the most recent value from the source Observable, and emits that only when dueTime enough time has passed without any other value appearing on the source Observable. If a new value appears before dueTime silence occurs, the previous value will be dropped and will not be emitted on the output Observable.

This is a rate-limiting operator, because it is impossible for more than one value to be emitted in any time window of duration dueTime, but it is also a delay-like operator since output emissions do not occur at the same time as they did on the source Observable. Optionally takes a IScheduler for managing timers.

Params:

NameTypeAttributeDescription
dueTime number

The timeout duration in milliseconds (or the time unit determined internally by the optional scheduler) for the window of time required to wait for emission silence before emitting the most recent source value.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the timeout for each value.

Return:

Observable

An Observable that delays the emissions of the source Observable by the specified dueTime, and may drop some values if they occur too frequently.

Example:

Emit the most recent click after a burst of clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public defaultIfEmpty(defaultValue: any): Observable source

Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

If the source Observable turns out to be empty, then this operator will emit a default value.

defaultIfEmpty emits the values emitted by the source Observable or a specified default value if the source Observable is empty (completes without having emitted any next value).

Params:

NameTypeAttributeDescription
defaultValue any
  • optional
  • default: null

The default value used if the source Observable is empty.

Return:

Observable

An Observable that emits either the specified defaultValue if the source Observable emits no items, or the values emitted by the source Observable.

Example:

If no clicks happen in 5 seconds, then emit "no clicks"
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
var result = clicksBeforeFive.defaultIfEmpty('no clicks');
result.subscribe(x => console.log(x));

Test:

See:

public delay(delay: number | Date, scheduler: Scheduler): Observable source

Delays the emission of items from the source Observable by a given timeout or until a given Date.

Time shifts each item by some specified amount of milliseconds.

If the delay argument is a Number, this operator time shifts the source Observable by that amount of time expressed in milliseconds. The relative time intervals between the values are preserved.

If the delay argument is a Date, this operator time shifts the start of the Observable execution until the given date occurs.

Params:

NameTypeAttributeDescription
delay number | Date

The delay duration in milliseconds (a number) or a Date until which the emission of the source items is delayed.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the time-shift for each item.

Return:

Observable

An Observable that delays the emissions of the source Observable by the specified timeout or Date.

Example:

Delay each click by one second
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));
Delay all clicks until a future date happens
var clicks = Rx.Observable.fromEvent(document, 'click');
var date = new Date('March 15, 2050 12:00:00'); // in the future
var delayedClicks = clicks.delay(date); // click emitted only after that date
delayedClicks.subscribe(x => console.log(x));

Test:

See:

public delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable source

Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.

It's like delay, but the time span of the delay duration is determined by a second Observable.

delayWhen time shifts each emitted value from the source Observable by a time span determined by another Observable. When the source emits a value, the delayDurationSelector function is called with the source value as argument, and should return an Observable, called the "duration" Observable. The source value is emitted on the output Observable only when the duration Observable emits a value or completes.

Optionally, delayWhen takes a second argument, subscriptionDelay, which is an Observable. When subscriptionDelay emits its first value or completes, the source Observable is subscribed to and starts behaving like described in the previous paragraph. If subscriptionDelay is not provided, delayWhen will subscribe to the source Observable as soon as the output Observable is subscribed.

Params:

NameTypeAttributeDescription
delayDurationSelector function(value: T): Observable

A function that returns an Observable for each value emitted by the source Observable, which is then used to delay the emission of that item on the output Observable until the Observable returned from this function emits a value.

subscriptionDelay Observable

An Observable that triggers the subscription to the source Observable once it emits any value.

Return:

Observable

An Observable that delays the emissions of the source Observable by an amount of time specified by the Observable returned by delayDurationSelector.

Example:

Delay each click by a random amount of time, between 0 and 5 seconds
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delayWhen(event =>
  Rx.Observable.interval(Math.random() * 5000)
);
delayedClicks.subscribe(x => console.log(x));

Test:

See:

public dematerialize(): Observable source

Converts an Observable of Notification objects into the emissions that they represent.

Unwraps Notification objects as actual next, error and complete emissions. The opposite of materialize.

dematerialize is assumed to operate an Observable that only emits Notification objects as next emissions, and does not emit any error. Such Observable is the output of a materialize operation. Those notifications are then unwrapped using the metadata they contain, and emitted as next, error, and complete on the output Observable.

Use this operator in conjunction with materialize.

Return:

Observable

An Observable that emits items and notifications embedded in Notification objects emitted by the source Observable.

Example:

Convert an Observable of Notifications to an actual Observable
var notifA = new Rx.Notification('N', 'A');
var notifB = new Rx.Notification('N', 'B');
var notifE = new Rx.Notification('E', void 0,
  new TypeError('x.toUpperCase is not a function')
);
var materialized = Rx.Observable.of(notifA, notifB, notifE);
var upperCase = materialized.dematerialize();
upperCase.subscribe(x => console.log(x), e => console.error(e));

// Results in:
// A
// B
// TypeError: x.toUpperCase is not a function

Test:

See:

public distinct(keySelector: function, flushes: Observable): Observable source

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

If a keySelector function is provided, then it will project each value from the source observable into a new value that it will check for equality with previously projected values. If a keySelector function is not provided, it will use each value from the source observable directly with an equality check against previous values.

In JavaScript runtimes that support Set, this operator will use a Set to improve performance of the distinct value checking.

In other runtimes, this operator will use a minimal implementation of Set that relies on an Array and indexOf under the hood, so performance will degrade as more values are checked for distinction. Even in newer browsers, a long-running distinct use might result in memory leaks. To help alleviate this in some scenarios, an optional flushes parameter is also provided so that the internal Set can be "flushed", basically clearing it of values.

Params:

NameTypeAttributeDescription
keySelector function
  • optional

optional function to select which value you want to check as distinct.

flushes Observable
  • optional

optional Observable for flushing the internal HashSet of the operator.

Return:

Observable

an Observable that emits items from the source Observable with distinct values.

Example:

A simple example with numbers
Observable.of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
  .distinct()
  .subscribe(x => console.log(x)); // 1, 2, 3, 4
An example using a keySelector function
interface Person {
   age: number,
   name: string
}

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'})
    .distinct((p: Person) => p.name)
    .subscribe(x => console.log(x));

// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }

Test:

See:

public distinctUntilChanged(compare: function): Observable source

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.

If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.

If a comparator function is not provided, an equality check is used by default.

Params:

NameTypeAttributeDescription
compare function
  • optional

optional comparison function called to test if an item is distinct from the previous item in the source.

Return:

Observable

an Observable that emits items from the source Observable with distinct values.

Example:

A simple example with numbers
Observable.of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
  .distinctUntilChanged()
  .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
An example using a compare function
interface Person {
   age: number,
   name: string
}

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'})
    { age: 6, name: 'Foo'})
    .distinctUntilChanged((p: Person, q: Person) => p.name === q.name)
    .subscribe(x => console.log(x));

// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }

Test:

See:

public distinctUntilKeyChanged(key: string, compare: function): Observable source

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct.

If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.

If a comparator function is not provided, an equality check is used by default.

Params:

NameTypeAttributeDescription
key string

string key for object property lookup on each item.

compare function
  • optional

optional comparison function called to test if an item is distinct from the previous item in the source.

Return:

Observable

an Observable that emits items from the source Observable with distinct values based on the key specified.

Example:

An example comparing the name of persons

 interface Person {
    age: number,
    name: string
 }

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'},
    { age: 6, name: 'Foo'})
    .distinctUntilKeyChanged('name')
    .subscribe(x => console.log(x));

// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
An example comparing the first letters of the name

interface Person {
    age: number,
    name: string
 }

Observable.of<Person>(
    { age: 4, name: 'Foo1'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo2'},
    { age: 6, name: 'Foo3'})
    .distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3))
    .subscribe(x => console.log(x));

// displays:
// { age: 4, name: 'Foo1' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo2' }

Test:

See:

public do(nextOrObserver: Observer | function, error: function, complete: function): Observable source

Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source.

Intercepts each emission on the source and runs a function, but returns an output which is identical to the source.

Returns a mirrored Observable of the source Observable, but modified so that the provided Observer is called to perform a side effect for every value, error, and completion emitted by the source. Any errors that are thrown in the aforementioned Observer or handlers are safely sent down the error path of the output Observable.

This operator is useful for debugging your Observables for the correct values or performing other side effects.

Note: this is different to a subscribe on the Observable. If the Observable returned by do is not subscribed, the side effects specified by the Observer will never happen. do therefore simply spies on existing execution, it does not trigger an execution to happen like subscribe does.

Params:

NameTypeAttributeDescription
nextOrObserver Observer | function
  • optional

A normal Observer object or a callback for next.

error function
  • optional

Callback for errors in the source.

complete function
  • optional

Callback for the completion of the source.

Return:

Observable

An Observable identical to the source, but runs the specified Observer or callback(s) for each item.

Example:

Map every every click to the clientX position of that click, while also logging the click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks
  .do(ev => console.log(ev))
  .map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

See:

public elementAt(index: number, defaultValue: T): Observable source

Emits the single value at the specified index in a sequence of emissions from the source Observable.

Emits only the i-th value, then completes.

elementAt returns an Observable that emits the item at the specified index in the source Observable, or a default value if that index is out of range and the default argument is provided. If the default argument is not given and the index is out of range, the output Observable will emit an ArgumentOutOfRangeError error.

Params:

NameTypeAttributeDescription
index number

Is the number i for the i-th source emission that has happened since the subscription, starting from the number 0.

defaultValue T
  • optional

The default value returned for missing indices.

Return:

Observable

An Observable that emits a single item, if it is found. Otherwise, will emit the default value if given. If not, then emits an error.

Throw:

ArgumentOutOfRangeError

When using elementAt(i), it delivers an ArgumentOutOrRangeError to the Observer's error callback if i < 0 or the Observable has completed before emitting the i-th next notification.

Example:

Emit only the third click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.elementAt(2);
result.subscribe(x => console.log(x));

// Results in:
// click 1 = nothing
// click 2 = nothing
// click 3 = MouseEvent object logged to console

Test:

See:

public every(predicate: function, thisArg: any): Observable source

Returns an Observable that emits whether or not every item of the source satisfies the condition specified.

Params:

NameTypeAttributeDescription
predicate function

a function for determining if an item meets a specified condition.

thisArg any
  • optional

optional object to use for this in the callback

Return:

Observable

an Observable of booleans that determines if all items of the source Observable meet the condition specified.

Example:

A simple example emitting true if all elements are less than 5, false otherwise
 Observable.of(1, 2, 3, 4, 5, 6)
    .every(x => x < 5)
    .subscribe(x => console.log(x)); // -> false

Test:

public exhaust(): Observable source

Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.

Flattens an Observable-of-Observables by dropping the next inner Observables while the current inner is still executing.

exhaust subscribes to an Observable that emits Observables, also known as a higher-order Observable. Each time it observes one of these emitted inner Observables, the output Observable begins emitting the items emitted by that inner Observable. So far, it behaves like mergeAll. However, exhaust ignores every new inner Observable if the previous Observable has not yet completed. Once that one completes, it will accept and flatten the next inner Observable and repeat this process.

Return:

Observable

Returns an Observable that takes a source of Observables and propagates the first observable exclusively until it completes before subscribing to the next.

Example:

Run a finite timer for each click, only if there is no currently active timer
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var result = higherOrder.exhaust();
result.subscribe(x => console.log(x));

Test:

See:

public exhaustMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

Maps each value to an Observable, then flattens all of these inner Observables using exhaust.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an (so-called "inner") Observable. When it projects a source value to an Observable, the output Observable begins emitting the items emitted by that projected Observable. However, exhaustMap ignores every new projected Observable if the previous projected Observable has not yet completed. Once that one completes, it will accept and flatten the next projected Observable and repeat this process.

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): Observable

A function that, when applied to an item emitted by the source Observable, returns an Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An Observable containing projected Observables of each item of the source, ignoring projected Observables that start before their preceding Observable has completed.

Example:

Run a finite timer for each click, only if there is no currently active timer
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable source

Recursively projects each source value to an Observable which is merged in the output Observable.

It's similar to mergeMap, but applies the projection function to every source value as well as every output value. It's recursive.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger. Expand will re-emit on the output Observable every source value. Then, each output value is given to the project function which returns an inner Observable to be merged on the output Observable. Those output values resulting from the projection are also given to the project function to produce new output values. This is how expand behaves recursively.

Params:

NameTypeAttributeDescription
project function(value: T, index: number)

A function that, when applied to an item emitted by the source or the output Observable, returns an Observable.

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

scheduler Scheduler
  • optional
  • default: null

The IScheduler to use for subscribing to each projected inner Observable.

Return:

Observable

An Observable that emits the source values and also result of applying the projection function to each value emitted on the output Observable and and merging the results of the Observables obtained from this transformation.

Example:

Start emitting the powers of two on every click, at most 10 of them
var clicks = Rx.Observable.fromEvent(document, 'click');
var powersOfTwo = clicks
  .mapTo(1)
  .expand(x => Rx.Observable.of(2 * x).delay(1000))
  .take(10);
powersOfTwo.subscribe(x => console.log(x));

Test:

See:

public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable source

Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.

Like Array.prototype.filter(), it only emits a value from the source if it passes a criterion function.

Similar to the well-known Array.prototype.filter method, this operator takes values from the source Observable, passes them through a predicate function and only emits those values that yielded true.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

A function that evaluates each value emitted by the source Observable. If it returns true, the value is emitted, if false the value is not passed to the output Observable. The index parameter is the number i for the i-th source emission that has happened since the subscription, starting from the number 0.

thisArg any
  • optional

An optional argument to determine the value of this in the predicate function.

Return:

Observable

An Observable of values from the source that were allowed by the predicate function.

Example:

Emit only click events whose target was a DIV element
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksOnDivs = clicks.filter(ev => ev.target.tagName === 'DIV');
clicksOnDivs.subscribe(x => console.log(x));

Test:

See:

public find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> source

Emits only the first value emitted by the source Observable that meets some condition.

Finds the first value that passes some test and emits that.

find searches for the first item in the source Observable that matches the specified condition embodied by the predicate, and returns the first occurrence in the source. Unlike first, the predicate is required in find, and does not emit an error if a valid value is not found.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean

A function called with each item to test for condition matching.

thisArg any
  • optional

An optional argument to determine the value of this in the predicate function.

Return:

Observable<T>

An Observable of the first item that matches the condition.

Example:

Find and emit the first click that happens on a DIV element
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable source

Emits only the index of the first value emitted by the source Observable that meets some condition.

It's like find, but emits the index of the found value, not the value itself.

findIndex searches for the first item in the source Observable that matches the specified condition embodied by the predicate, and returns the (zero-based) index of the first occurrence in the source. Unlike first, the predicate is required in findIndex, and does not emit an error if a valid value is not found.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean

A function called with each item to test for condition matching.

thisArg any
  • optional

An optional argument to determine the value of this in the predicate function.

Return:

Observable

An Observable of the index of the first item that matches the condition.

Example:

Emit the index of first click that happens on a DIV element
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.findIndex(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> source

Emits only the first value (or the first value that meets some condition) emitted by the source Observable.

Emits only the first value. Or emits only the first value that passes some test.

If called with no arguments, first emits the first value of the source Observable, then completes. If called with a predicate function, first emits the first value of the source that matches the specified condition. It may also take a resultSelector function to produce the output value from the input value, and a defaultValue to emit in case the source completes before it is able to emit a valid value. Throws an error if defaultValue was not provided and a matching element is not found.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean
  • optional

An optional function called with each item to test for condition matching.

resultSelector function(value: T, index: number): R
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source Observable. The arguments passed to this function are:

  • value: the value that was emitted on the source.
  • index: the "index" of the value from the source.
defaultValue R
  • optional

The default value emitted in case no valid value was found on the source.

Return:

Observable<T | R>

an Observable of the first item that matches the condition.

Throw:

EmptyError

Delivers an EmptyError to the Observer's error callback if the Observable completes before any next notification was sent.

Example:

Emit only the first click that happens on the DOM
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first();
result.subscribe(x => console.log(x));
Emits the first click that happens on a DIV
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public forEach(next: Function, PromiseCtor: PromiseConstructor): Promise source

Params:

NameTypeAttributeDescription
next Function

a handler for each value emitted by the observable

PromiseCtor PromiseConstructor
  • optional

a constructor function used to instantiate the Promise

Return:

Promise

a promise that either resolves on observable completion or rejects with the handled error

public groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> source

Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.

Params:

NameTypeAttributeDescription
keySelector function(value: T): K

a function that extracts the key for each item.

elementSelector function(value: T): R
  • optional

a function that extracts the return element for each item.

durationSelector function(grouped: GroupedObservable<K, R>): Observable<any>
  • optional

a function that returns an Observable to determine how long each group should exist.

Return:

Observable<GroupedObservable<K, R>>

an Observable that emits GroupedObservables, each of which corresponds to a unique key value and each of which emits those items from the source Observable that share that key value.

Test:

public ignoreElements(): Observable source

Ignores all items emitted by the source Observable and only passes calls of complete or error.

Return:

Observable

an empty Observable that only calls complete or error, based on which one is called by the source Observable.

Test:

public isEmpty(): Observable source

If the source Observable is empty it returns an Observable that emits true, otherwise it emits false.

Return:

Observable

an Observable that emits a Boolean.

Test:

public last(predicate: function): Observable source

Returns an Observable that emits only the last item emitted by the source Observable. It optionally takes a predicate function as a parameter, in which case, rather than emitting the last item from the source Observable, the resulting Observable will emit the last item from the source Observable that satisfies the predicate.

Params:

NameTypeAttributeDescription
predicate function

the condition any source emitted item has to satisfy.

Return:

Observable

an Observable that emits only the last item satisfying the given condition from the source, or an NoSuchElementException if no such items are emitted.

Throw:

EmptyError

Delivers an EmptyError to the Observer's error callback if the Observable completes before any next notification was sent.

*

Throws if no items that match the predicate are emitted by the source Observable.

Test:

public letProto(func: *): Observable<R> source

Params:

NameTypeAttributeDescription
func *

Return:

Observable<R>

public lift(operator: Operator): Observable source

Creates a new Observable, with this Observable as the source, and the passed operator defined as the new observable's operator.

Params:

NameTypeAttributeDescription
operator Operator

the operator defining the operation to take on the observable

Return:

Observable

a new observable with the Operator applied

public map(project: function(value: T, index: number): R, thisArg: any): Observable<R> source

Applies a given project function to each value emitted by the source Observable, and emits the resulting values as an Observable.

Like Array.prototype.map(), it passes each source value through a transformation function to get corresponding output values.

Similar to the well known Array.prototype.map function, this operator applies a projection to each value and emits that projection in the output Observable.

Params:

NameTypeAttributeDescription
project function(value: T, index: number): R

The function to apply to each value emitted by the source Observable. The index parameter is the number i for the i-th emission that has happened since the subscription, starting from the number 0.

thisArg any
  • optional

An optional argument to define what this is in the project function.

Return:

Observable<R>

An Observable that emits the values from the source Observable transformed by the given project function.

Example:

Map every every click to the clientX position of that click
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

Test:

See:

public mapTo(value: any): Observable source

Emits the given constant value on the output Observable every time the source Observable emits a value.

Like map, but it maps every source value to the same output value every time.

Takes a constant value as argument, and emits that whenever the source Observable emits a value. In other words, ignores the actual source value, and simply uses the emission moment to know when to emit the given value.

Params:

NameTypeAttributeDescription
value any

The value to map each source value to.

Return:

Observable

An Observable that emits the given value every time the source Observable emits something.

Example:

Map every every click to the string 'Hi'
var clicks = Rx.Observable.fromEvent(document, 'click');
var greetings = clicks.mapTo('Hi');
greetings.subscribe(x => console.log(x));

Test:

See:

public materialize(): Observable<Notification<T>> source

Represents all of the notifications from the source Observable as next emissions marked with their original types within Notification objects.

Wraps next, error and complete emissions in Notification objects, emitted as next on the output Observable.

materialize returns an Observable that emits a next notification for each next, error, or complete emission of the source Observable. When the source Observable emits complete, the output Observable will emit next as a Notification of type "complete", and then it will emit complete as well. When the source Observable emits error, the output will emit next as a Notification of type "error", and then complete.

This operator is useful for producing metadata of the source Observable, to be consumed as next emissions. Use it in conjunction with dematerialize.

Return:

Observable<Notification<T>>

An Observable that emits Notification objects that wrap the original emissions from the source Observable with metadata.

Example:

Convert a faulty Observable to an Observable of Notifications
var letters = Rx.Observable.of('a', 'b', 13, 'd');
var upperCase = letters.map(x => x.toUpperCase());
var materialized = upperCase.materialize();
materialized.subscribe(x => console.log(x));

// Results in the following:
// - Notification {kind: "N", value: "A", error: undefined, hasValue: true}
// - Notification {kind: "N", value: "B", error: undefined, hasValue: true}
// - Notification {kind: "E", value: undefined, error: TypeError:
//   x.toUpperCase is not a function at MapSubscriber.letters.map.x
//   [as project] (http://1…, hasValue: false}

Test:

See:

public max(optional: Function): Observable source

The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.

Params:

NameTypeAttributeDescription
optional Function

comparer function that it will use instead of its default to compare the value of two items.

Return:

Observable

an Observable that emits item with the largest value.

Example:

Get the maximal value of a series of numbers
Rx.Observable.of(5, 4, 7, 2, 8)
  .max()
  .subscribe(x => console.log(x)); // -> 8
Use a comparer function to get the maximal item
interface Person {
  age: number,
  name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
                      {age: 5, name: 'Bar'},
                      {age: 9, name: 'Beer'})
          .max<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
          .subscribe((x: Person) => console.log(x.name)); // -> 'Beer'
}

Test:

See:

public merge(other: Observable, concurrent: number, scheduler: Scheduler): Observable source

Creates an output Observable which concurrently emits all values from every given input Observable.

Flattens multiple Observables together by blending their values into one Observable.

merge subscribes to each given input Observable (either the source or an Observable given as argument), and simply forwards (without doing any transformation) all the values from all the input Observables to the output Observable. The output Observable only completes once all input Observables have completed. Any error delivered by an input Observable will be immediately emitted on the output Observable.

Params:

NameTypeAttributeDescription
other Observable

An input Observable to merge with the source Observable. More than one input Observables may be given as argument.

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

scheduler Scheduler
  • optional
  • default: null

The IScheduler to use for managing concurrency of input Observables.

Return:

Observable

an Observable that emits items that are the result of every input Observable.

Example:

Merge together two Observables: 1s interval and clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = clicks.merge(timer);
clicksOrTimer.subscribe(x => console.log(x));
Merge together 3 Observables, but only 2 run concurrently
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = timer1.merge(timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));

Test:

See:

public mergeAll(concurrent: number): Observable source

Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

Flattens an Observable-of-Observables.

mergeAll subscribes to an Observable that emits Observables, also known as a higher-order Observable. Each time it observes one of these emitted inner Observables, it subscribes to that and delivers all the values from the inner Observable on the output Observable. The output Observable only completes once all inner Observables have completed. Any error delivered by a inner Observable will be immediately emitted on the output Observable.

Params:

NameTypeAttributeDescription
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of inner Observables being subscribed to concurrently.

Return:

Observable

An Observable that emits values coming from all the inner Observables emitted by the source Observable.

Example:

Spawn a new interval Observable for each click event, and blend their outputs as one Observable
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var firstOrder = higherOrder.mergeAll();
firstOrder.subscribe(x => console.log(x));
Count from 0 to 9 every second for each click, but only allow 2 concurrent timers
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(10));
var firstOrder = higherOrder.mergeAll(2);
firstOrder.subscribe(x => console.log(x));

Test:

See:

public mergeMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source

Projects each source value to an Observable which is merged in the output Observable.

Maps each value to an Observable, then flattens all of these inner Observables using mergeAll.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): Observable

A function that, when applied to an item emitted by the source Observable, returns an Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

Return:

Observable

An Observable that emits the result of applying the projection function (and the optional resultSelector) to each item emitted by the source Observable and merging the results of the Observables obtained from this transformation.

Example:

Map and flatten each letter to an Observable ticking every 1 second
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
  Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));

// Results in the following:
// a0
// b0
// c0
// a1
// b1
// c1
// continues to list a,b,c with respective ascending integers

Test:

See:

public mergeMapTo(innerObservable: Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source

Projects each source value to the same Observable which is merged multiple times in the output Observable.

It's like mergeMap, but maps each value always to the same inner Observable.

Maps each source value to the given Observable innerObservable regardless of the source value, and then merges those resulting Observables into one single Observable, which is the output Observable.

Params:

NameTypeAttributeDescription
innerObservable Observable

An Observable to replace each value from the source Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

Return:

Observable

An Observable that emits items from the given innerObservable (and optionally transformed through resultSelector) every time a value is emitted on the source Observable.

Example:

For each click event, start an interval Observable ticking every 1 second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.mergeMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public mergeScan(project: *, seed: *, concurrent: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
project *
seed *
concurrent *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

public min(optional: Function): Observable<R> source

The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.

Params:

NameTypeAttributeDescription
optional Function

comparer function that it will use instead of its default to compare the value of two items.

Return:

Observable<R>

an Observable that emits item with the smallest value.

Example:

Get the minimal value of a series of numbers
Rx.Observable.of(5, 4, 7, 2, 8)
  .min()
  .subscribe(x => console.log(x)); // -> 2
Use a comparer function to get the minimal item
interface Person {
  age: number,
  name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
                      {age: 5, name: 'Bar'},
                      {age: 9, name: 'Beer'})
          .min<Person>( (a: Person, b: Person) => a.age < b.age ? -1 : 1)
          .subscribe((x: Person) => console.log(x.name)); // -> 'Bar'
}

Test:

See:

public multicast(Factory: Function | Subject, Optional: Function): Observable source

Returns an Observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.

Params:

NameTypeAttributeDescription
Factory Function | Subject

function to create an intermediate subject through which the source sequence's elements will be multicast to the selector function or Subject to push source elements into.

Optional Function

selector function that can use the multicasted source stream as many times as needed, without causing multiple subscriptions to the source stream. Subscribers to the given source will receive all notifications of the source from the time of the subscription forward.

Return:

Observable

an Observable that emits the results of invoking the selector on the items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.

Test:

public observeOn(scheduler: *, delay: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
scheduler *
delay *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

See:

public pairwise(): Observable<Array<T>> source

Groups pairs of consecutive emissions together and emits them as an array of two values.

Puts the current value and previous value together as an array, and emits that.

The Nth emission from the source Observable will cause the output Observable to emit an array [(N-1)th, Nth] of the previous and the current value, as a pair. For this reason, pairwise emits on the second and subsequent emissions from the source Observable, but not on the first emission, because there is no previous value in that case.

Return:

Observable<Array<T>>

An Observable of pairs (as arrays) of consecutive values from the source Observable.

Example:

On every click (starting from the second), emit the relative distance to the previous click
var clicks = Rx.Observable.fromEvent(document, 'click');
var pairs = clicks.pairwise();
var distance = pairs.map(pair => {
  var x0 = pair[0].clientX;
  var y0 = pair[0].clientY;
  var x1 = pair[1].clientX;
  var y1 = pair[1].clientY;
  return Math.sqrt(Math.pow(x0 - x1, 2) + Math.pow(y0 - y1, 2));
});
distance.subscribe(x => console.log(x));

Test:

See:

public partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>] source

Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate.

It's like filter, but returns two Observables: one like the output of filter, and the other with values that did not pass the condition.

partition outputs an array with two Observables that partition the values from the source Observable through the given predicate function. The first Observable in that array emits source values for which the predicate argument returns true. The second Observable emits source values for which the predicate returns false. The first behaves like filter and the second behaves like filter with the predicate negated.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

A function that evaluates each value emitted by the source Observable. If it returns true, the value is emitted on the first Observable in the returned array, if false the value is emitted on the second Observable in the array. The index parameter is the number i for the i-th source emission that has happened since the subscription, starting from the number 0.

thisArg any
  • optional

An optional argument to determine the value of this in the predicate function.

Return:

[Observable<T>, Observable<T>]

An array with two Observables: one with values that passed the predicate, and another with values that did not pass the predicate.

Example:

Partition click events into those on DIV elements and those elsewhere
var clicks = Rx.Observable.fromEvent(document, 'click');
var parts = clicks.partition(ev => ev.target.tagName === 'DIV');
var clicksOnDivs = parts[0];
var clicksElsewhere = parts[1];
clicksOnDivs.subscribe(x => console.log('DIV clicked: ', x));
clicksElsewhere.subscribe(x => console.log('Other clicked: ', x));

Test:

See:

public pluck(properties: ...string): Observable source

Maps each source value (an object) to its specified nested property.

Like map, but meant only for picking one of the nested properties of every emitted object.

Given a list of strings describing a path to an object property, retrieves the value of a specified nested property from all values in the source Observable. If a property can't be resolved, it will return undefined for that value.

Params:

NameTypeAttributeDescription
properties ...string

The nested properties to pluck from each source value (an object).

Return:

Observable

Returns a new Observable of property values from the source values.

Example:

Map every every click to the tagName of the clicked target element
var clicks = Rx.Observable.fromEvent(document, 'click');
var tagNames = clicks.pluck('target', 'tagName');
tagNames.subscribe(x => console.log(x));

Test:

See:

public publish(Optional: Function): * source

Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it.

Params:

NameTypeAttributeDescription
Optional Function

selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all notifications of the source from the time of the subscription on.

Return:

*

a ConnectableObservable that upon connection causes the source Observable to emit items to its Observers.

Test:

public publishBehavior(value: *): ConnectableObservable<T> source

Params:

NameTypeAttributeDescription
value *

Test:

public publishReplay(bufferSize: *, windowTime: *, scheduler: *): ConnectableObservable<T> source

Params:

NameTypeAttributeDescription
bufferSize *
windowTime *
scheduler *

Test:

public race(): Observable source

Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables

Params:

NameTypeAttributeDescription
...observables ...Observables

sources used to race for which Observable emits first.

Return:

Observable

an Observable that mirrors the output of the first Observable to emit an item.

Test:

public reduce(accumulator: function(acc: R, value: T): R, seed: R): Observable<R> source

Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.

Combines together all values emitted on the source, using an accumulator function that knows how to join a new source value into the accumulation from the past.

Like Array.prototype.reduce(), reduce applies an accumulator function against an accumulation and each value of the source Observable (from the past) to reduce it to a single value, emitted on the output Observable. Note that reduce will only emit one value, only when the source Observable completes. It is equivalent to applying operator scan followed by operator last.

Returns an Observable that applies a specified accumulator function to each item emitted by the source Observable. If a seed value is specified, then that value will be used as the initial value for the accumulator. If no seed value is specified, the first item of the source is used as the seed.

Params:

NameTypeAttributeDescription
accumulator function(acc: R, value: T): R

The accumulator function called on each source value.

seed R
  • optional

The initial accumulation value.

Return:

Observable<R>

An Observable that emits a single value that is the result of accumulating the values emitted by the source Observable.

Example:

Count the number of click events that happened in 5 seconds
var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click')
  .takeUntil(Rx.Observable.interval(5000));
var ones = clicksInFiveSeconds.mapTo(1);
var seed = 0;
var count = ones.reduce((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));

Test:

See:

public repeat(scheduler: Scheduler, count: number): Observable source

Returns an Observable that repeats the stream of items emitted by the source Observable at most count times, on a particular IScheduler.

Params:

NameTypeAttributeDescription
scheduler Scheduler
  • optional

the IScheduler to emit the items on.

count number
  • optional

the number of times the source Observable items are repeated, a count of 0 will yield an empty Observable.

Return:

Observable

an Observable that repeats the stream of items emitted by the source Observable at most count times.

Test:

public repeatWhen(receives: notificationHandler, the: scheduler): Observable source

Returns an Observable that emits the same values as the source observable with the exception of a complete. A complete will cause the emission of the Throwable that cause the complete to the Observable returned from notificationHandler. If that Observable calls onComplete or complete then retry will call complete or error on the child subscription. Otherwise, this Observable will resubscribe to the source observable, on a particular IScheduler.

Params:

NameTypeAttributeDescription
receives notificationHandler

an Observable of notifications with which a user can complete or error, aborting the retry.

the scheduler

IScheduler on which to subscribe to the source Observable.

Return:

Observable

the source Observable modified with retry logic.

Test:

public retry(number: number): Observable source

Returns an Observable that mirrors the source Observable, resubscribing to it if it calls error and the predicate returns true for that specific exception and retry count. If the source Observable calls error, this method will resubscribe to the source Observable for a maximum of count resubscriptions (given as a number parameter) rather than propagating the error call.

Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second time and emits: [1, 2, 3, 4, 5] then the complete stream of emissions and notifications would be: [1, 2, 1, 2, 3, 4, 5, complete].

Params:

NameTypeAttributeDescription
number number

of retry attempts before failing.

Return:

Observable

the source Observable modified with the retry logic.

Test:

public retryWhen(receives: notificationHandler, the: scheduler): Observable source

Returns an Observable that emits the same values as the source observable with the exception of an error. An error will cause the emission of the Throwable that cause the error to the Observable returned from notificationHandler. If that Observable calls onComplete or error then retry will call complete or error on the child subscription. Otherwise, this Observable will resubscribe to the source observable, on a particular IScheduler.

Params:

NameTypeAttributeDescription
receives notificationHandler

an Observable of notifications with which a user can complete or error, aborting the retry.

the scheduler

IScheduler on which to subscribe to the source Observable.

Return:

Observable

the source Observable modified with retry logic.

Test:

public sample(notifier: Observable<any>): Observable<T> source

Emits the most recently emitted value from the source Observable whenever another Observable, the notifier, emits.

It's like sampleTime, but samples whenever the notifier Observable emits something.

Whenever the notifier Observable emits a value or completes, sample looks at the source Observable and emits whichever value it has most recently emitted since the previous sampling, unless the source has not emitted anything since the previous sampling. The notifier is subscribed to as soon as the output Observable is subscribed.

Params:

NameTypeAttributeDescription
notifier Observable<any>

The Observable to use for sampling the source Observable.

Return:

Observable<T>

An Observable that emits the results of sampling the values emitted by the source Observable whenever the notifier Observable emits value or completes.

Example:

On every click, sample the most recent "seconds" timer
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = seconds.sample(clicks);
result.subscribe(x => console.log(x));

Test:

See:

public sampleTime(period: number, scheduler: Scheduler): Observable<T> source

Emits the most recently emitted value from the source Observable within periodic time intervals.

Samples the source Observable at periodic time intervals, emitting what it samples.

sampleTime periodically looks at the source Observable and emits whichever value it has most recently emitted since the previous sampling, unless the source has not emitted anything since the previous sampling. The sampling happens periodically in time every period milliseconds (or the time unit defined by the optional scheduler argument). The sampling starts as soon as the output Observable is subscribed.

Params:

NameTypeAttributeDescription
period number

The sampling period expressed in milliseconds or the time unit determined internally by the optional scheduler.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the sampling.

Return:

Observable<T>

An Observable that emits the results of sampling the values emitted by the source Observable at the specified time interval.

Example:

Every second, emit the most recent click at most once
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.sampleTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> source

Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.

It's like reduce, but emits the current accumulation whenever the source emits a value.

Combines together all values emitted on the source, using an accumulator function that knows how to join a new source value into the accumulation from the past. Is similar to reduce, but emits the intermediate accumulations.

Returns an Observable that applies a specified accumulator function to each item emitted by the source Observable. If a seed value is specified, then that value will be used as the initial value for the accumulator. If no seed value is specified, the first item of the source is used as the seed.

Params:

NameTypeAttributeDescription
accumulator function(acc: R, value: T, index: number): R

The accumulator function called on each source value.

seed T | R
  • optional

The initial accumulation value.

Return:

Observable<R>

An observable of the accumulated values.

Example:

Count the number of click events
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));

Test:

See:

public sequenceEqual(compareTo: Observable, comparor: function): Observable source

Compares all values of two observables in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal.

Checks to see of all values emitted by both observables are equal, in order.

sequenceEqual subscribes to two observables and buffers incoming values from each observable. Whenever either observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom up; If any value pair doesn't match, the returned observable will emit false and complete. If one of the observables completes, the operator will wait for the other observable to complete; If the other observable emits before completing, the returned observable will emit false and complete. If one observable never completes or emits after the other complets, the returned observable will never complete.

Params:

NameTypeAttributeDescription
compareTo Observable

the observable sequence to compare the source sequence to.

comparor function
  • optional

An optional function to compare each value pair

Return:

Observable

An Observable of a single boolean value representing whether or not the values emitted by both observables were equal in sequence

Example:

figure out if the Konami code matches
var code = Rx.Observable.from([
 "ArrowUp",
 "ArrowUp",
 "ArrowDown",
 "ArrowDown",
 "ArrowLeft",
 "ArrowRight",
 "ArrowLeft",
 "ArrowRight",
 "KeyB",
 "KeyA",
 "Enter" // no start key, clearly.
]);

var keys = Rx.Observable.fromEvent(document, 'keyup')
 .map(e => e.code);
var matches = keys.bufferCount(11, 1)
 .mergeMap(
   last11 =>
     Rx.Observable.from(last11)
       .sequenceEqual(code)
  );
matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));

Test:

See:

public share(): Observable<T> source

Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot. This is an alias for .publish().refCount().

Return:

Observable<T>

an Observable that upon connection causes the source Observable to emit items to its Observers

Test:

public single(a: Function): Observable<T> source

Returns an Observable that emits the single item emitted by the source Observable that matches a specified predicate, if that Observable emits one such item. If the source Observable emits more than one such item or no such items, notify of an IllegalArgumentException or NoSuchElementException respectively.

Params:

NameTypeAttributeDescription
a Function

predicate function to evaluate items emitted by the source Observable.

Return:

Observable<T>

an Observable that emits the single item emitted by the source Observable that matches the predicate. .

Throw:

EmptyError

Delivers an EmptyError to the Observer's error callback if the Observable completes before any next notification was sent.

Test:

public skip(the: Number): Observable source

Returns an Observable that skips n items emitted by an Observable.

Params:

NameTypeAttributeDescription
the Number

n of times, items emitted by source Observable should be skipped.

Return:

Observable

an Observable that skips values emitted by the source Observable.

Test:

public skipUntil(the: Observable): Observable<T> source

Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

Params:

NameTypeAttributeDescription
the Observable

second Observable that has to emit an item before the source Observable's elements begin to be mirrored by the resulting Observable.

Return:

Observable<T>

an Observable that skips items from the source Observable until the second Observable emits an item, then emits the remaining items.

Test:

public skipWhile(predicate: Function): Observable<T> source

Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.

Params:

NameTypeAttributeDescription
predicate Function

a function to test each item emitted from the source Observable.

Return:

Observable<T>

an Observable that begins emitting items emitted by the source Observable when the specified predicate becomes false.

Test:

public startWith(an: Values): Observable source

Returns an Observable that emits the items in a specified Iterable before it begins to emit items emitted by the source Observable.

Params:

NameTypeAttributeDescription
an Values

Iterable that contains the items you want the modified Observable to emit first.

Return:

Observable

an Observable that emits the items in the specified Iterable and then emits the items emitted by the source Observable.

Test:

public subscribeOn(the: Scheduler): Observable<T> source

Asynchronously subscribes Observers to this Observable on the specified IScheduler.

Params:

NameTypeAttributeDescription
the Scheduler

IScheduler to perform subscription actions on.

Return:

Observable<T>

the source Observable modified so that its subscriptions happen on the specified IScheduler .

Test:

public switch(): Observable<T> source

Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.

Flattens an Observable-of-Observables by dropping the previous inner Observable once a new one appears.

switch subscribes to an Observable that emits Observables, also known as a higher-order Observable. Each time it observes one of these emitted inner Observables, the output Observable subscribes to the inner Observable and begins emitting the items emitted by that. So far, it behaves like mergeAll. However, when a new inner Observable is emitted, switch unsubscribes from the earlier-emitted inner Observable and subscribes to the new inner Observable and begins emitting items from it. It continues to behave like this for subsequent inner Observables.

Return:

Observable<T>

An Observable that emits the items emitted by the Observable most recently emitted by the source Observable.

Example:

Rerun an interval Observable on every click event
var clicks = Rx.Observable.fromEvent(document, 'click');
// Each click event is mapped to an Observable that ticks every second
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var switched = higherOrder.switch();
// The outcome is that `switched` is essentially a timer that restarts
// on every click. The interval Observables from older clicks do not merge
// with the current interval Observable.
switched.subscribe(x => console.log(x));

Test:

See:

public switchMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.

Maps each value to an Observable, then flattens all of these inner Observables using switch.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an (so-called "inner") Observable. Each time it observes one of these inner Observables, the output Observable begins emitting the items emitted by that inner Observable. When a new inner Observable is emitted, switchMap stops emitting items from the earlier-emitted inner Observable and begins emitting items from the new one. It continues to behave like this for subsequent inner Observables.

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): Observable

A function that, when applied to an item emitted by the source Observable, returns an Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An Observable that emits the result of applying the projection function (and the optional resultSelector) to each item emitted by the source Observable and taking only the values from the most recently projected inner Observable.

Example:

Rerun an interval Observable on every click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMap((ev) => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public switchMapTo(innerObservable: Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to the same Observable which is flattened multiple times with switch in the output Observable.

It's like switchMap, but maps each value always to the same inner Observable.

Maps each source value to the given Observable innerObservable regardless of the source value, and then flattens those resulting Observables into one single Observable, which is the output Observable. The output Observables emits values only from the most recently emitted instance of innerObservable.

Params:

NameTypeAttributeDescription
innerObservable Observable

An Observable to replace each value from the source Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An Observable that emits items from the given innerObservable (and optionally transformed through resultSelector) every time a value is emitted on the source Observable, and taking only the values from the most recently projected inner Observable.

Example:

Rerun an interval Observable on every click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public take(count: number): Observable<T> source

Emits only the first count values emitted by the source Observable.

Takes the first count values from the source, then completes.

take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.

Params:

NameTypeAttributeDescription
count number

The maximum number of next values to emit.

Return:

Observable<T>

An Observable that emits only the first count values emitted by the source Observable, or all of the values from the source if the source emits fewer than count values.

Throw:

ArgumentOutOfRangeError

When using take(i), it delivers an ArgumentOutOrRangeError to the Observer's error callback if i < 0.

Example:

Take the first 5 seconds of an infinite 1-second interval Observable
var interval = Rx.Observable.interval(1000);
var five = interval.take(5);
five.subscribe(x => console.log(x));

Test:

See:

public takeLast(count: number): Observable<T> source

Emits only the last count values emitted by the source Observable.

Remembers the latest count values, then emits those only when the source completes.

takeLast returns an Observable that emits at most the last count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. This operator must wait until the complete notification emission from the source in order to emit the next values on the output Observable, because otherwise it is impossible to know whether or not more values will be emitted on the source. For this reason, all values are emitted synchronously, followed by the complete notification.

Params:

NameTypeAttributeDescription
count number

The maximum number of values to emit from the end of the sequence of values emitted by the source Observable.

Return:

Observable<T>

An Observable that emits at most the last count values emitted by the source Observable.

Throw:

ArgumentOutOfRangeError

When using takeLast(i), it delivers an ArgumentOutOrRangeError to the Observer's error callback if i < 0.

Example:

Take the last 3 values of an Observable with many values
var many = Rx.Observable.range(1, 100);
var lastThree = many.takeLast(3);
lastThree.subscribe(x => console.log(x));

Test:

See:

public takeUntil(notifier: Observable): Observable<T> source

Emits the values emitted by the source Observable until a notifier Observable emits a value.

Lets values pass until a second Observable, notifier, emits something. Then, it completes.

takeUntil subscribes and begins mirroring the source Observable. It also monitors a second Observable, notifier that you provide. If the notifier emits a value or a complete notification, the output Observable stops mirroring the source Observable and completes.

Params:

NameTypeAttributeDescription
notifier Observable

The Observable whose first emitted value will cause the output Observable of takeUntil to stop emitting values from the source Observable.

Return:

Observable<T>

An Observable that emits the values from the source Observable until such time as notifier emits its first value.

Example:

Tick every second until the first click happens
var interval = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = interval.takeUntil(clicks);
result.subscribe(x => console.log(x));

Test:

See:

public takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> source

Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.

Takes values from the source only while they pass the condition given. When the first value does not satisfy, it completes.

takeWhile subscribes and begins mirroring the source Observable. Each value emitted on the source is given to the predicate function which returns a boolean, representing a condition to be satisfied by the source values. The output Observable emits the source values until such time as the predicate returns false, at which point takeWhile stops mirroring the source Observable and completes the output Observable.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

A function that evaluates a value emitted by the source Observable and returns a boolean. Also takes the (zero-based) index as the second argument.

Return:

Observable<T>

An Observable that emits the values from the source Observable so long as each value satisfies the condition defined by the predicate, then completes.

Example:

Emit click events only while the clientX property is greater than 200
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.takeWhile(ev => ev.clientX > 200);
result.subscribe(x => console.log(x));

Test:

See:

public throttle(durationSelector: function(value: T): Observable | Promise): Observable<T> source

Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.

It's like throttleTime, but the silencing duration is determined by a second Observable.

throttle emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, it is forwarded to the output Observable, and then the timer is enabled by calling the durationSelector function with the source value, which returns the "duration" Observable. When the duration Observable emits a value or completes, the timer is disabled, and this process repeats for the next source value.

Params:

NameTypeAttributeDescription
durationSelector function(value: T): Observable | Promise

A function that receives a value from the source Observable, for computing the silencing duration for each source value, returned as an Observable or a Promise.

Return:

Observable<T>

An Observable that performs the throttle operation to limit the rate of emissions from the source.

Example:

Emit clicks at a rate of at most one click per second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttle(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public throttleTime(duration: number, scheduler: Scheduler): Observable<T> source

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

Lets a value pass, then ignores source values for the next duration milliseconds.

throttleTime emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, it is forwarded to the output Observable, and then the timer is enabled. After duration milliseconds (or the time unit determined internally by the optional scheduler) has passed, the timer is disabled, and this process repeats for the next source value. Optionally takes a IScheduler for managing timers.

Params:

NameTypeAttributeDescription
duration number

Time to wait before emitting another value after emitting the last value, measured in milliseconds or the time unit determined internally by the optional scheduler.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the sampling.

Return:

Observable<T>

An Observable that performs the throttle operation to limit the rate of emissions from the source.

Example:

Emit clicks at a rate of at most one click per second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttleTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
scheduler *

Return:

Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T>

Test:

public timeout(due: number, scheduler: Scheduler): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
due number
scheduler Scheduler
  • optional

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

public timeoutWith(due: *, withObservable: *, scheduler: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
due *
withObservable *
scheduler *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

public timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
scheduler *

Return:

Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T>

Test:

public toArray(): Observable<any[]> | WebSocketSubject<T> | Observable<T> source

Return:

Observable<any[]> | WebSocketSubject<T> | Observable<T>

Test:

public toPromise(PromiseCtor: *): Promise<T> source

Params:

NameTypeAttributeDescription
PromiseCtor *

Return:

Promise<T>

Test:

public window(windowBoundaries: Observable<any>): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable whenever windowBoundaries emits.

It's like buffer, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits connected, non-overlapping windows. It emits the current window and opens a new one whenever the Observable windowBoundaries emits an item. Because each window is an Observable, the output is a higher-order Observable.

Params:

NameTypeAttributeDescription
windowBoundaries Observable<any>

An Observable that completes the previous window and starts a new window.

Return:

Observable<Observable<T>>

An Observable of windows, which are Observables emitting values of the source Observable.

Example:

In every window of 1 second each, emit at most 2 click events
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var result = clicks.window(interval)
  .map(win => win.take(2)) // each window has at most 2 emissions
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));

Test:

See:

public windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable with each nested Observable emitting at most windowSize values.

It's like bufferCount, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits windows every startWindowEvery items, each containing no more than windowSize items. When the source Observable completes or encounters an error, the output Observable emits the current window and propagates the notification from the source Observable. If startWindowEvery is not provided, then new windows are started immediately at the start of the source and when each window completes with size windowSize.

Params:

NameTypeAttributeDescription
windowSize number

The maximum number of values emitted by each window.

startWindowEvery number
  • optional

Interval at which to start a new window. For example if startWindowEvery is 2, then a new window will be started on every other value from the source. A new window is started at the beginning of the source by default.

Return:

Observable<Observable<T>>

An Observable of windows, which in turn are Observable of values.

Example:

Ignore every 3rd click event, starting from the first one
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(3)
  .map(win => win.skip(1)) // skip first of every 3 clicks
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
Ignore every 3rd click event, starting from the third one
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(2, 3)
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));

Test:

See:

public windowTime(windowTimeSpan: number, windowCreationInterval: number, scheduler: Scheduler): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable periodically in time.

It's like bufferTime, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable starts a new window periodically, as determined by the windowCreationInterval argument. It emits each window after a fixed timespan, specified by the windowTimeSpan argument. When the source Observable completes or encounters an error, the output Observable emits the current window and propagates the notification from the source Observable. If windowCreationInterval is not provided, the output Observable starts a new window when the previous window of duration windowTimeSpan completes.

Params:

NameTypeAttributeDescription
windowTimeSpan number

The amount of time to fill each window.

windowCreationInterval number
  • optional

The interval at which to start new windows.

scheduler Scheduler
  • optional
  • default: async

The scheduler on which to schedule the intervals that determine window boundaries.

Return:

Observable<Observable<T>>

An observable of windows, which in turn are Observables.

Example:

In every window of 1 second each, emit at most 2 click events
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowTime(1000)
  .map(win => win.take(2)) // each window has at most 2 emissions
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
Every 5 seconds start a window 1 second long, and emit at most 2 click events per window
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowTime(1000, 5000)
  .map(win => win.take(2)) // each window has at most 2 emissions
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));

Test:

See:

public windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable starting from an emission from openings and ending when the output of closingSelector emits.

It's like bufferToggle, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits windows that contain those items emitted by the source Observable between the time when the openings Observable emits an item and when the Observable returned by closingSelector emits an item.

Params:

NameTypeAttributeDescription
openings Observable<O>

An observable of notifications to start new windows.

closingSelector function(value: O): Observable

A function that takes the value emitted by the openings observable and returns an Observable, which, when it emits (either next or complete), signals that the associated window should complete.

Return:

Observable<Observable<T>>

An observable of windows, which in turn are Observables.

Example:

Every other second, emit the click events from the next 500ms
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var result = clicks.windowToggle(openings, i =>
  i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
).mergeAll();
result.subscribe(x => console.log(x));

Test:

See:

public windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.

It's like bufferWhen, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits connected, non-overlapping windows. It emits the current window and opens a new one whenever the Observable produced by the specified closingSelector function emits an item. The first window is opened immediately when subscribing to the output Observable.

Params:

NameTypeAttributeDescription
closingSelector function(): Observable

A function that takes no arguments and returns an Observable that signals (on either next or complete) when to close the previous window and start a new one.

Return:

Observable<Observable<T>>

An observable of windows, which in turn are Observables.

Example:

Emit only the first two clicks events in every window of [1-5] random seconds
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks
  .windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000))
  .map(win => win.take(2)) // each window has at most 2 emissions
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));

Test:

See:

public withLatestFrom(other: Observable, project: Function): Observable source

Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.

Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.

withLatestFrom combines each value from the source Observable (the instance) with the latest values from the other input Observables only when the source emits a value, optionally using a project function to determine the value to be emitted on the output Observable. All input Observables must emit at least one value before the output Observable will emit a value.

Params:

NameTypeAttributeDescription
other Observable

An input Observable to combine with the source Observable. More than one input Observables may be given as argument.

project Function
  • optional

Projection function for combining values together. Receives all values in order of the Observables passed, where the first parameter is a value from the source Observable. (e.g. a.withLatestFrom(b, c, (a1, b1, c1) => a1 + b1 + c1)). If this is not passed, arrays will be emitted on the output Observable.

Return:

Observable

An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable.

Example:

On every click event, emit an array with the latest timer event plus the click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var result = clicks.withLatestFrom(timer);
result.subscribe(x => console.log(x));

Test:

See:

public zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
project *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

public zipProto(observables: *): Observable<R> source

Params:

NameTypeAttributeDescription
observables *

Return:

Observable<R>