Home Manual Reference Source Test Repository
import {Observable} from '@reactivex/rxjs/es6/Observable.js'
public class | source

Observable

Indirect Subclass:

AnonymousSubject, AsyncSubject, BehaviorSubject, es6/operator/windowTime.js~CountedSubject, ReplaySubject

A representation of any set of values over any amount of time. This the most basic building block of RxJS.

Test:

Static Method Summary

Static Public Methods
public static

bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable

Converts a callback API to a function that returns an Observable.

public static

bindNodeCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable

Converts a Node.js-style callback API to a function that returns an Observable.

public static

combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, scheduler: Scheduler): Observable

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

public static

concat(input1: ObservableInput, input2: ObservableInput, scheduler: Scheduler): Observable

Creates an output Observable which sequentially emits all values from given Observable and then moves on to the next.

public static

create(onSubscription: function(observer: Observer): TeardownLogic): Observable

Creates a new Observable, that will execute the specified function when an Observer subscribes to it.

public static

Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.

public static

empty(scheduler: Scheduler): Observable

Creates an Observable that emits no items to the Observer and immediately emits a complete notification.

public static

forkJoin(sources: *): any

public static

from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>

Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.

public static

fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T>

Creates an Observable that emits events of a specific type coming from the given event target.

public static

fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T>

Creates an Observable from an API based on addHandler/removeHandler functions.

public static

fromPromise(promise: Promise<T>, scheduler: Scheduler): Observable<T>

Converts a Promise to an Observable.

public static

interval(period: number, scheduler: Scheduler): Observable

Creates an Observable that emits sequential numbers every specified interval of time, on a specified IScheduler.

public static

merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable

Creates an output Observable which concurrently emits all values from every given input Observable.

public static

Creates an Observable that emits no items to the Observer.

public static

of(values: ...T, scheduler: Scheduler): Observable<T>

Creates an Observable that emits some values you specify as arguments, immediately one after the other, and then emits a complete notification.

public static

range(start: number, count: number, scheduler: Scheduler): Observable

Creates an Observable that emits a sequence of numbers within a specified range.

public static

throw(error: any, scheduler: Scheduler): Observable

Creates an Observable that emits no items to the Observer and immediately emits an error notification.

public static

timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable

Creates an Observable that starts emitting after an initialDelay and emits ever increasing numbers after each period of time thereafter.

public static

webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject

Wrapper around the w3c-compatible WebSocket object provided by the browser.

public static

zip(observables: *): Observable<R>

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

Constructor Summary

Public Constructor
public

constructor(subscribe: Function)

Method Summary

Public Methods
public

An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable

public

audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T>

Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.

public

auditTime(duration: number, scheduler: Scheduler): Observable<T>

Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

public

buffer(closingNotifier: Observable<any>): Observable<T[]>

Buffers the source Observable values until closingNotifier emits.

public

bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]>

Buffers the source Observable values until the size hits the maximum bufferSize given.

public

bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]>

Buffers the source Observable values for a specific time period.

public

bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]>

Buffers the source Observable values starting from an emission from openings and ending when the output of closingSelector emits.

public

bufferWhen(closingSelector: function(): Observable): Observable<T[]>

Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.

public

catch(selector: function): Observable

Catches errors on the observable to be handled by returning a new observable or throwing an error.

public

Converts a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest.

public

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

public

Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable.

public

Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.

public

concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

public

concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable.

public

count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable

Counts the number of emissions on the source and emits that number when the source completes.

public

debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable

Emits a value from the source Observable only after a particular time span determined by another Observable has passed without another source emission.

public

debounceTime(dueTime: number, scheduler: Scheduler): Observable

Emits a value from the source Observable only after a particular time span has passed without another source emission.

public

defaultIfEmpty(defaultValue: any): Observable

Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

public

delay(delay: number | Date, scheduler: Scheduler): Observable

Delays the emission of items from the source Observable by a given timeout or until a given Date.

public

delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable

Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.

public

Converts an Observable of Notification objects into the emissions that they represent.

public

distinct(keySelector: function, flushes: Observable): Observable

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

public

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.

public

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct.

public

do(nextOrObserver: Observer | function, error: function, complete: function): Observable

Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source.

public

elementAt(index: number, defaultValue: T): Observable

Emits the single value at the specified index in a sequence of emissions from the source Observable.

public

every(predicate: function, thisArg: any): Observable

Returns an Observable that emits whether or not every item of the source satisfies the condition specified.

public

Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.

public

exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

public

expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable

Recursively projects each source value to an Observable which is merged in the output Observable.

public

filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable

Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.

public

find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T>

Emits only the first value emitted by the source Observable that meets some condition.

public

findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable

Emits only the index of the first value emitted by the source Observable that meets some condition.

public

first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R>

Emits only the first value (or the first value that meets some condition) emitted by the source Observable.

public

forEach(next: Function, PromiseCtor: PromiseConstructor): Promise

public

groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>>

Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.

public

Ignores all items emitted by the source Observable and only passes calls of complete or error.

public

If the source Observable is empty it returns an Observable that emits true, otherwise it emits false.

public

last(predicate: function): Observable

Returns an Observable that emits only the last item emitted by the source Observable.

public

letProto(func: *): Observable<R>

public

lift(operator: Operator): Observable

Creates a new Observable, with this Observable as the source, and the passed operator defined as the new observable's operator.

public

map(project: function(value: T, index: number): R, thisArg: any): Observable<R>

Applies a given project function to each value emitted by the source Observable, and emits the resulting values as an Observable.

public

mapTo(value: any): Observable

Emits the given constant value on the output Observable every time the source Observable emits a value.

public

Represents all of the notifications from the source Observable as next emissions marked with their original types within Notification objects.

public

max(comparer: Function): Observable

The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.

public

merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable

Creates an output Observable which concurrently emits all values from every given input Observable.

public

mergeAll(concurrent: number): Observable

Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

public

mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

Projects each source value to an Observable which is merged in the output Observable.

public

mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

Projects each source value to the same Observable which is merged multiple times in the output Observable.

public

mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R>

Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.

public

min(comparer: Function): Observable<R>

The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.

public

multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable

Returns an Observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.

public

observeOn(scheduler: *, delay: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

Groups pairs of consecutive emissions together and emits them as an array of two values.

public

partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>]

Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate.

It's like filter, but returns two Observables: one like the output of filter, and the other with values that did not pass the condition.

public

pluck(properties: ...string): Observable

Maps each source value (an object) to its specified nested property.

public

publish(selector: Function): *

Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it.

public
public
public

publishReplay(bufferSize: *, windowTime: *, scheduler: *): ConnectableObservable<T>

public

Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.

public

reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R>

Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.

public

Returns an Observable that repeats the stream of items emitted by the source Observable at most count times.

public

repeatWhen(notifier: function(notifications: Observable): Observable): Observable

Returns an Observable that mirrors the source Observable with the exception of a complete.

public

Returns an Observable that mirrors the source Observable with the exception of an error.

public

Returns an Observable that mirrors the source Observable with the exception of an error.

public

sample(notifier: Observable<any>): Observable<T>

Emits the most recently emitted value from the source Observable whenever another Observable, the notifier, emits.

public

sampleTime(period: number, scheduler: Scheduler): Observable<T>

Emits the most recently emitted value from the source Observable within periodic time intervals.

public

scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R>

Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.

public

sequenceEqual(compareTo: Observable, comparor: function): Observable

Compares all values of two observables in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal.

public

Returns a new Observable that multicasts (shares) the original Observable.

public

single(predicate: Function): Observable<T>

Returns an Observable that emits the single item emitted by the source Observable that matches a specified predicate, if that Observable emits one such item.

public

Returns an Observable that skips the first count items emitted by the source Observable.

public

Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

public

skipWhile(predicate: Function): Observable<T>

Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.

public

startWith(values: ...T, scheduler: Scheduler): Observable

Returns an Observable that emits the items you specify as arguments before it begins to emit items emitted by the source Observable.

public

subscribeOn(scheduler: Scheduler): Observable<T>

Asynchronously subscribes Observers to this Observable on the specified IScheduler.

public

Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.

public

switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.

public

switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

Projects each source value to the same Observable which is flattened multiple times with switch in the output Observable.

public

take(count: number): Observable<T>

Emits only the first count values emitted by the source Observable.

public

takeLast(count: number): Observable<T>

Emits only the last count values emitted by the source Observable.

public

Emits the values emitted by the source Observable until a notifier Observable emits a value.

public

takeWhile(predicate: function(value: T, index: number): boolean): Observable<T>

Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.

public

throttle(durationSelector: function(value: T): SubscribableOrPromise): Observable<T>

Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.

public

throttleTime(duration: number, scheduler: Scheduler): Observable<T>

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

public

timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T>

public

timeout(due: number, scheduler: Scheduler): Observable<R> | WebSocketSubject<T> | Observable<T>

public

timeoutWith(due: *, withObservable: *, scheduler: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T>

public
public

toPromise(PromiseCtor: *): Promise<T>

Converts an Observable sequence to a ES2015 compliant promise.

public

window(windowBoundaries: Observable<any>): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable whenever windowBoundaries emits.

public

windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable with each nested Observable emitting at most windowSize values.

public

windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable starting from an emission from openings and ending when the output of closingSelector emits.

public

windowWhen(closingSelector: function(): Observable): Observable<Observable<T>>

Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.

public

Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.

public

zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

zipProto(observables: *): Observable<R>

Static Public Methods

public static bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable source

Converts a callback API to a function that returns an Observable.

Give it a function f of type f(x, callback) and it will return a function g that when called as g(x) will output an Observable.

bindCallback is not an operator because its input and output are not Observables. The input is a function func with some parameters, but the last parameter must be a callback function that func calls when it is done.

The output of bindCallback is a function that takes the same parameters as func, except the last one (the callback). When the output function is called with arguments, it will return an Observable. If func function calls its callback with one argument, the Observable will emit that value. If on the other hand callback is called with multiple values, resulting Observable will emit an array with these arguments.

It is very important to remember, that input function func is not called when output function is, but rather when Observable returned by output function is subscribed. This means if func makes AJAX request, that request will be made every time someone subscribes to resulting Observable, but not before.

Optionally, selector function can be passed to bindObservable. That function takes the same arguments as callback, and returns value that will be emitted by Observable instead of callback parameters themselves. Even though by default multiple arguments passed to callback appear in the stream as array, selector function will be called with arguments directly, just as callback would. This means you can imagine default selector (when one is not provided explicitly) as function that aggregates all its arguments into array, or simply returns first argument, if there is only one.

Last optional parameter - Scheduler - can be used to control when call to func happens after someone subscribes to Observable, as well as when results passed to callback will be emitted. By default subscription to Observable calls func synchronously, but using Scheduler.async as last parameter will defer call to input function, just like wrapping that call in setTimeout with time 0 would. So if you use async Scheduler and call subscribe on output Observable, all function calls that are currently executing, will end before func is invoked.

When it comes to emitting results passed to callback, by default they are emitted immediately after func invokes callback. In particular, if callback is called synchronously, then subscription to resulting Observable will call next function synchronously as well. If you want to defer that call, using Scheduler.async will, again, do the job. This means that by using Scheduler.async you can, in a sense, ensure that func always calls its callback asynchronously, thus avoiding terrifying Zalgo.

Note that Observable created by output function will always emit only one value and then complete right after. Even if func calls callback multiple times, values from second and following calls will never appear in the stream. If you need to listen for multiple calls, you probably want to use fromEvent or fromEventPattern instead.

If func depends on some context (this property), that context will be set to the same context that output function has at call time. In particular, if func is called as method of some object, in order to preserve proper behaviour, it is recommended to set context of output function to that object as well, provided func is not already bound.

If input function calls its callback in "node style" (i.e. first argument to callback is optional error parameter signaling whether call failed or not), bindNodeCallback provides convenient error handling and probably is a better choice. bindCallback will treat such functions without any difference and error parameter (whether passed or not) will always be interpreted as regular callback argument.

Params:

NameTypeAttributeDescription
func function

Function with a callback as the last parameter.

selector function
  • optional

A function which takes the arguments from the callback and maps those to a value to emit on the output Observable.

scheduler Scheduler
  • optional

The scheduler on which to schedule the callbacks.

Return:

function(...params: *): Observable

A function which returns the Observable that delivers the same values the callback would deliver.

Example:

Convert jQuery's getJSON to an Observable API
// Suppose we have jQuery.getJSON('/my/url', callback)
var getJSONAsObservable = Rx.Observable.bindCallback(jQuery.getJSON);
var result = getJSONAsObservable('/my/url');
result.subscribe(x => console.log(x), e => console.error(e));
Receive array of arguments passed to callback
someFunction((a, b, c) => {
  console.log(a); // 5
  console.log(b); // 'some string'
  console.log(c); // {someProperty: 'someValue'}
});

const boundSomeFunction = Rx.Observable.bindCallback(someFunction);
boundSomeFunction.subscribe(values => {
  console.log(values) // [5, 'some string', {someProperty: 'someValue'}]
});
Use bindCallback with selector function
someFunction((a, b, c) => {
  console.log(a); // 'a'
  console.log(b); // 'b'
  console.log(c); // 'c'
});

const boundSomeFunction = Rx.Observable.bindCallback(someFunction, (a, b, c) => a + b + c);
boundSomeFunction.subscribe(value => {
  console.log(value) // 'abc'
});
Compare behaviour with and without async Scheduler
function iCallMyCallbackSynchronously(cb) {
  cb();
}

const boundSyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously);
const boundAsyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously, null, Rx.Scheduler.async);

boundSyncFn().subscribe(() => console.log('I was sync!'));
boundAsyncFn().subscribe(() => console.log('I was async!'));
console.log('This happened...');

// Logs:
// I was sync!
// This happened...
// I was async!
Use bindCallback on object method
const boundMethod = Rx.Observable.bindCallback(someObject.methodWithCallback);
boundMethod.call(someObject) // make sure methodWithCallback has access to someObject
.subscribe(subscriber);

Test:

See:

public static bindNodeCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable source

Converts a Node.js-style callback API to a function that returns an Observable.

It's just like bindCallback, but the callback is expected to be of type callback(error, result).

bindNodeCallback is not an operator because its input and output are not Observables. The input is a function func with some parameters, but the last parameter must be a callback function that func calls when it is done. The callback function is expected to follow Node.js conventions, where the first argument to the callback is an error object, signaling whether call was successful. If that object is passed to callback, it means something went wrong.

The output of bindNodeCallback is a function that takes the same parameters as func, except the last one (the callback). When the output function is called with arguments, it will return an Observable. If func calls its callback with error parameter present, Observable will error with that value as well. If error parameter is not passed, Observable will emit second parameter. If there are more parameters (third and so on), Observable will emit an array with all arguments, except first error argument.

Optionally bindNodeCallback accepts selector function, which allows you to make resulting Observable emit value computed by selector, instead of regular callback arguments. It works similarly to bindCallback selector, but Node.js-style error argument will never be passed to that function.

Note that func will not be called at the same time output function is, but rather whenever resulting Observable is subscribed. By default call to func will happen synchronously after subscription, but that can be changed with proper Scheduler provided as optional third parameter. Scheduler can also control when values from callback will be emitted by Observable. To find out more, check out documentation for bindCallback, where Scheduler works exactly the same.

As in bindCallback, context (this property) of input function will be set to context of returned function, when it is called.

After Observable emits value, it will complete immediately. This means even if func calls callback again, values from second and consecutive calls will never appear on the stream. If you need to handle functions that call callbacks multiple times, check out fromEvent or fromEventPattern instead.

Note that bindNodeCallback can be used in non-Node.js environments as well. "Node.js-style" callbacks are just a convention, so if you write for browsers or any other environment and API you use implements that callback style, bindNodeCallback can be safely used on that API functions as well.

Remember that Error object passed to callback does not have to be an instance of JavaScript built-in Error object. In fact, it does not even have to an object. Error parameter of callback function is interpreted as "present", when value of that parameter is truthy. It could be, for example, non-zero number, non-empty string or boolean true. In all of these cases resulting Observable would error with that value. This means usually regular style callbacks will fail very often when bindNodeCallback is used. If your Observable errors much more often then you would expect, check if callback really is called in Node.js-style and, if not, switch to bindCallback instead.

Note that even if error parameter is technically present in callback, but its value is falsy, it still won't appear in array emitted by Observable or in selector function.

Params:

NameTypeAttributeDescription
func function

Function with a Node.js-style callback as the last parameter.

selector function
  • optional

A function which takes the arguments from the callback and maps those to a value to emit on the output Observable.

scheduler Scheduler
  • optional

The scheduler on which to schedule the callbacks.

Return:

function(...params: *): Observable

A function which returns the Observable that delivers the same values the Node.js callback would deliver.

Example:

Read a file from the filesystem and get the data as an Observable
import * as fs from 'fs';
var readFileAsObservable = Rx.Observable.bindNodeCallback(fs.readFile);
var result = readFileAsObservable('./roadNames.txt', 'utf8');
result.subscribe(x => console.log(x), e => console.error(e));
Use on function calling callback with multiple arguments
someFunction((err, a, b) => {
  console.log(err); // null
  console.log(a); // 5
  console.log(b); // "some string"
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
boundSomeFunction()
.subscribe(value => {
  console.log(value); // [5, "some string"]
});
Use with selector function
someFunction((err, a, b) => {
  console.log(err); // undefined
  console.log(a); // "abc"
  console.log(b); // "DEF"
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction, (a, b) => a + b);
boundSomeFunction()
.subscribe(value => {
  console.log(value); // "abcDEF"
});
Use on function calling callback in regular style
someFunction(a => {
  console.log(a); // 5
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
boundSomeFunction()
.subscribe(
  value => {}             // never gets called
  err => console.log(err) // 5
);

Test:

See:

public static combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, scheduler: Scheduler): Observable source

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.

combineLatest combines the values from all the Observables passed as arguments. This is done by subscribing to each Observable in order and, whenever any Observable emits, collecting an array of the most recent values from each Observable. So if you pass n Observables to operator, returned Observable will always emit an array of n values, in order corresponding to order of passed Observables (value from the first Observable on the first place and so on).

Static version of combineLatest accepts either an array of Observables or each Observable can be put directly as an argument. Note that array of Observables is good choice, if you don't know beforehand how many Observables you will combine. Passing empty array will result in Observable that completes immediately.

To ensure output array has always the same length, combineLatest will actually wait for all input Observables to emit at least once, before it starts emitting results. This means if some Observable emits values before other Observables started emitting, all that values but last will be lost. On the other hand, is some Observable does not emit value but completes, resulting Observable will complete at the same moment without emitting anything, since it will be now impossible to include value from completed Observable in resulting array. Also, if some input Observable does not emit any value and never completes, combineLatest will also never emit and never complete, since, again, it will wait for all streams to emit some value.

If at least one Observable was passed to combineLatest and all passed Observables emitted something, resulting Observable will complete when all combined streams complete. So even if some Observable completes, result of combineLatest will still emit values when other Observables do. In case of completed Observable, its value from now on will always be the last emitted value. On the other hand, if any Observable errors, combineLatest will error immediately as well, and all other Observables will be unsubscribed.

combineLatest accepts as optional parameter project function, which takes as arguments all values that would normally be emitted by resulting Observable. project can return any kind of value, which will be then emitted by Observable instead of default array. Note that project does not take as argument that array of values, but values themselves. That means default project can be imagined as function that takes all its arguments and puts them into an array.

Params:

NameTypeAttributeDescription
observable1 ObservableInput

An input Observable to combine with other Observables.

observable2 ObservableInput

An input Observable to combine with other Observables. More than one input Observables may be given as arguments or an array of Observables may be given as the first argument.

project function
  • optional

An optional function to project the values from the combined latest values into a new value on the output Observable.

scheduler Scheduler
  • optional
  • default: null

The IScheduler to use for subscribing to each input Observable.

Return:

Observable

An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable.

Example:

Combine two timer Observables
const firstTimer = Rx.Observable.timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
const secondTimer = Rx.Observable.timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
const combinedTimers = Rx.Observable.combineLatest(firstTimer, secondTimer);
combinedTimers.subscribe(value => console.log(value));
// Logs
// [0, 0] after 0.5s
// [1, 0] after 1s
// [1, 1] after 1.5s
// [2, 1] after 2s
Combine an array of Observables
const observables = [1, 5, 10].map(
  n => Rx.Observable.of(n).delay(n * 1000).startWith(0) // emit 0 and then emit n after n seconds
);
const combined = Rx.Observable.combineLatest(observables);
combined.subscribe(value => console.log(value));
// Logs
// [0, 0, 0] immediately
// [1, 0, 0] after 1s
// [1, 5, 0] after 5s
// [1, 5, 10] after 10s
Use project function to dynamically calculate the Body-Mass Index
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = Rx.Observable.combineLatest(weight, height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));

// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222

Test:

See:

public static concat(input1: ObservableInput, input2: ObservableInput, scheduler: Scheduler): Observable source

Creates an output Observable which sequentially emits all values from given Observable and then moves on to the next.

Concatenates multiple Observables together by sequentially emitting their values, one Observable after the other.

concat joins multiple Observables together, by subscribing to them one at a time and merging their results into the output Observable. You can pass either an array of Observables, or put them directly as arguments. Passing an empty array will result in Observable that completes immediately.

concat will subscribe to first input Observable and emit all its values, without changing or affecting them in any way. When that Observable completes, it will subscribe to then next Observable passed and, again, emit its values. This will be repeated, until the operator runs out of Observables. When last input Observable completes, concat will complete as well. At any given moment only one Observable passed to operator emits values. If you would like to emit values from passed Observables concurrently, check out merge instead, especially with optional concurrent parameter. As a matter of fact, concat is an equivalent of merge operator with concurrent parameter set to 1.

Note that if some input Observable never completes, concat will also never complete and Observables following the one that did not complete will never be subscribed. On the other hand, if some Observable simply completes immediately after it is subscribed, it will be invisible for concat, which will just move on to the next Observable.

If any Observable in chain errors, instead of passing control to the next Observable, concat will error immediately as well. Observables that would be subscribed after the one that emitted error, never will.

If you pass to concat the same Observable many times, its stream of values will be "replayed" on every subscription, which means you can repeat given Observable as many times as you like. If passing the same Observable to concat 1000 times becomes tedious, you can always use repeat.

Params:

NameTypeAttributeDescription
input1 ObservableInput

An input Observable to concatenate with others.

input2 ObservableInput

An input Observable to concatenate with others. More than one input Observables may be given as argument.

scheduler Scheduler
  • optional
  • default: null

An optional IScheduler to schedule each Observable subscription on.

Return:

Observable

All values of each passed Observable merged into a single Observable, in order, in serial fashion.

Example:

Concatenate a timer counting from 0 to 3 with a synchronous sequence from 1 to 10
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = Rx.Observable.concat(timer, sequence);
result.subscribe(x => console.log(x));

// results in:
// 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
Concatenate an array of 3 Observables
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = Rx.Observable.concat([timer1, timer2, timer3]); // note that array is passed
result.subscribe(x => console.log(x));

// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9
Concatenate the same Observable to repeat it
const timer = Rx.Observable.interval(1000).take(2);

Rx.Observable.concat(timer, timer) // concating the same Observable!
.subscribe(
  value => console.log(value),
  err => {},
  () => console.log('...and it is done!')
);

// Logs:
// 0 after 1s
// 1 after 2s
// 0 after 3s
// 1 after 4s
// "...and it is done!" also after 4s

See:

public static create(onSubscription: function(observer: Observer): TeardownLogic): Observable source

Creates a new Observable, that will execute the specified function when an Observer subscribes to it.

Create custom Observable, that does whatever you like.

create converts an onSubscription function to an actual Observable. Whenever someone subscribes to that Observable, the function will be called with an Observer instance as a first and only parameter. onSubscription should then invoke the Observers next, error and complete methods.

Calling next with a value will emit that value to the observer. Calling complete means that Observable finished emitting and will not do anything else. Calling error means that something went wrong - value passed to error method should provide details on what exactly happened.

A well-formed Observable can emit as many values as it needs via next method, but complete and error methods can be called only once and nothing else can be called thereafter. If you try to invoke next, complete or error methods after created Observable already completed or ended with an error, these calls will be ignored to preserve so called Observable Contract. Note that you are not required to call complete at any point - it is perfectly fine to create an Observable that never ends, depending on your needs.

onSubscription can optionally return either a function or an object with unsubscribe method. In both cases function or method will be called when subscription to Observable is being cancelled and should be used to clean up all resources. So, for example, if you are using setTimeout in your custom Observable, when someone unsubscribes, you can clear planned timeout, so that it does not fire needlessly and browser (or other environment) does not waste computing power on timing event that no one will listen to anyways.

Most of the times you should not need to use create, because existing operators allow you to create an Observable for most of the use cases. That being said, create is low-level mechanism allowing you to create any Observable, if you have very specific needs.

TypeScript signature issue

Because Observable extends class which already has defined static create function, but with different type signature, it was impossible to assign proper signature to Observable.create. Because of that, it has very general type Function and thus function passed to create will not be type checked, unless you explicitly state what signature it should have.

When using TypeScript we recommend to declare type signature of function passed to create as (observer: Observer) => TeardownLogic, where Observer and TeardownLogic are interfaces provided by the library.

Params:

NameTypeAttributeDescription
onSubscription function(observer: Observer): TeardownLogic

A function that accepts an Observer, and invokes its next, error, and complete methods as appropriate, and optionally returns some logic for cleaning up resources.

Return:

Observable

An Observable that, whenever subscribed, will execute the specified function.

Example:

Emit three numbers, then complete.
var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});
observable.subscribe(
  value => console.log(value),
  err => {},
  () => console.log('this is the end')
);

// Logs
// 1
// 2
// 3
// "this is the end"
Emit an error
const observable = Rx.Observable.create((observer) => {
  observer.error('something went really wrong...');
});

observable.subscribe(
  value => console.log(value), // will never be called
  err => console.log(err),
  () => console.log('complete') // will never be called
);

// Logs
// "something went really wrong..."
Return unsubscribe function

const observable = Rx.Observable.create(observer => {
  const id = setTimeout(() => observer.next('...'), 5000); // emit value after 5s

  return () => { clearTimeout(id); console.log('cleared!'); };
});

const subscription = observable.subscribe(value => console.log(value));

setTimeout(() => subscription.unsubscribe(), 3000); // cancel subscription after 3s

// Logs:
// "cleared!" after 3s

// Never logs "..."

See:

public static defer(observableFactory: function(): SubscribableOrPromise): Observable source

Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.

Creates the Observable lazily, that is, only when it is subscribed.

defer allows you to create the Observable only when the Observer subscribes, and create a fresh Observable for each Observer. It waits until an Observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual Observable.

Params:

NameTypeAttributeDescription
observableFactory function(): SubscribableOrPromise

The Observable factory function to invoke for each Observer that subscribes to the output Observable. May also return a Promise, which will be converted on the fly to an Observable.

Return:

Observable

An Observable whose Observers' subscriptions trigger an invocation of the given Observable factory function.

Example:

Subscribe to either an Observable of clicks or an Observable of interval, at random
var clicksOrInterval = Rx.Observable.defer(function () {
  if (Math.random() > 0.5) {
    return Rx.Observable.fromEvent(document, 'click');
  } else {
    return Rx.Observable.interval(1000);
  }
});
clicksOrInterval.subscribe(x => console.log(x));

// Results in the following behavior:
// If the result of Math.random() is greater than 0.5 it will listen
// for clicks anywhere on the "document"; when document is clicked it
// will log a MouseEvent object to the console. If the result is less
// than 0.5 it will emit ascending numbers, one every second(1000ms).

Test:

See:

public static empty(scheduler: Scheduler): Observable source

Creates an Observable that emits no items to the Observer and immediately emits a complete notification.

Just emits 'complete', and nothing else.

This static operator is useful for creating a simple Observable that only emits the complete notification. It can be used for composing with other Observables, such as in a mergeMap.

Params:

NameTypeAttributeDescription
scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emission of the complete notification.

Return:

Observable

An "empty" Observable: emits only the complete notification.

Example:

Emit the number 7, then complete.
var result = Rx.Observable.empty().startWith(7);
result.subscribe(x => console.log(x));
Map and flatten only odd numbers to the sequence 'a', 'b', 'c'
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
  x % 2 === 1 ? Rx.Observable.of('a', 'b', 'c') : Rx.Observable.empty()
);
result.subscribe(x => console.log(x));

// Results in the following to the console:
// x is equal to the count on the interval eg(0,1,2,3,...)
// x will occur every 1000ms
// if x % 2 is equal to 1 print abc
// if x % 2 is not equal to 1 nothing will be output

Test:

See:

public static forkJoin(sources: *): any source

Params:

NameTypeAttributeDescription
sources *

Return:

any

Test:

public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T> source

Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.

Converts almost anything to an Observable.

Convert various other objects and data types into Observables. from converts a Promise or an array-like or an iterable object into an Observable that emits the items in that promise or array or iterable. A String, in this context, is treated as an array of characters. Observable-like objects (contains a function named with the ES2015 Symbol for Observable) can also be converted through this operator.

Params:

NameTypeAttributeDescription
ish ObservableInput<T>

A subscribable object, a Promise, an Observable-like, an Array, an iterable or an array-like object to be converted.

scheduler Scheduler
  • optional

The scheduler on which to schedule the emissions of values.

Return:

Observable<T>

The Observable whose values are originally from the input object that was converted.

Example:

Converts an array to an Observable
var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));

// Results in the following:
// 10 20 30
Convert an infinite iterable (from a generator) to an Observable
function* generateDoubles(seed) {
  var i = seed;
  while (true) {
    yield i;
    i = 2 * i; // double it
  }
}

var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator).take(10);
result.subscribe(x => console.log(x));

// Results in the following:
// 3 6 12 24 48 96 192 384 768 1536

Test:

See:

public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T> source

Creates an Observable that emits events of a specific type coming from the given event target.

Creates an Observable from DOM events, or Node EventEmitter events or others.

Creates an Observable by attaching an event listener to an "event target", which may be an object with addEventListener and removeEventListener, a Node.js EventEmitter, a jQuery style EventEmitter, a NodeList from the DOM, or an HTMLCollection from the DOM. The event handler is attached when the output Observable is subscribed, and removed when the Subscription is unsubscribed.

Params:

NameTypeAttributeDescription
target EventTargetLike

The DOMElement, event target, Node.js EventEmitter, NodeList or HTMLCollection to attach the event handler to.

eventName string

The event name of interest, being emitted by the target.

options EventListenerOptions
  • optional

Options to pass through to addEventListener

selector SelectorMethodSignature<T>
  • optional

An optional function to post-process results. It takes the arguments from the event handler and should return a single value.

Return:

Observable<T>

Example:

Emits clicks happening on the DOM document
var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));

// Results in:
// MouseEvent object logged to console everytime a click
// occurs on the document.

Test:

See:

public static fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T> source

Creates an Observable from an API based on addHandler/removeHandler functions.

Converts any addHandler/removeHandler API to an Observable.

Creates an Observable by using the addHandler and removeHandler functions to add and remove the handlers, with an optional selector function to project the event arguments to a result. The addHandler is called when the output Observable is subscribed, and removeHandler is called when the Subscription is unsubscribed.

Params:

NameTypeAttributeDescription
addHandler function(handler: Function): any

A function that takes a handler function as argument and attaches it somehow to the actual source of events.

removeHandler function(handler: Function, signal?: any): void
  • optional

An optional function that takes a handler function as argument and removes it in case it was previously attached using addHandler. if addHandler returns signal to teardown when remove, removeHandler function will forward it.

selector function(...args: any): T
  • optional

An optional function to post-process results. It takes the arguments from the event handler and should return a single value.

Return:

Observable<T>

Example:

Emits clicks happening on the DOM document
function addClickHandler(handler) {
  document.addEventListener('click', handler);
}

function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}

var clicks = Rx.Observable.fromEventPattern(
  addClickHandler,
  removeClickHandler
);
clicks.subscribe(x => console.log(x));

Test:

See:

public static fromPromise(promise: Promise<T>, scheduler: Scheduler): Observable<T> source

Converts a Promise to an Observable.

Returns an Observable that just emits the Promise's resolved value, then completes.

Converts an ES2015 Promise or a Promises/A+ spec compliant Promise to an Observable. If the Promise resolves with a value, the output Observable emits that resolved value as a next, and then completes. If the Promise is rejected, then the output Observable emits the corresponding Error.

Params:

NameTypeAttributeDescription
promise Promise<T>

The promise to be converted.

scheduler Scheduler
  • optional

An optional IScheduler to use for scheduling the delivery of the resolved value (or the rejection).

Return:

Observable<T>

An Observable which wraps the Promise.

Example:

Convert the Promise returned by Fetch to an Observable
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));

Test:

See:

public static interval(period: number, scheduler: Scheduler): Observable source

Creates an Observable that emits sequential numbers every specified interval of time, on a specified IScheduler.

Emits incremental numbers periodically in time.

interval returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between those emissions. The first emission is not sent immediately, but only after the first period has passed. By default, this operator uses the async IScheduler to provide a notion of time, but you may pass any IScheduler to it.

Params:

NameTypeAttributeDescription
period number
  • optional
  • default: 0

The interval size in milliseconds (by default) or the time unit determined by the scheduler's clock.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for scheduling the emission of values, and providing a notion of "time".

Return:

Observable

An Observable that emits a sequential number each time interval.

Example:

Emits ascending numbers, one every second (1000ms)
var numbers = Rx.Observable.interval(1000);
numbers.subscribe(x => console.log(x));

Test:

See:

public static merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable source

Creates an output Observable which concurrently emits all values from every given input Observable.

Flattens multiple Observables together by blending their values into one Observable.

merge subscribes to each given input Observable (as arguments), and simply forwards (without doing any transformation) all the values from all the input Observables to the output Observable. The output Observable only completes once all input Observables have completed. Any error delivered by an input Observable will be immediately emitted on the output Observable.

Params:

NameTypeAttributeDescription
observables ...ObservableInput

Input Observables to merge together.

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

scheduler Scheduler
  • optional
  • default: null

The IScheduler to use for managing concurrency of input Observables.

Return:

Observable

an Observable that emits items that are the result of every input Observable.

Example:

Merge together two Observables: 1s interval and clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = Rx.Observable.merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));

// Results in the following:
// timer will emit ascending values, one every second(1000ms) to console
// clicks logs MouseEvents to console everytime the "document" is clicked
// Since the two streams are merged you see these happening
// as they occur.
Merge together 3 Observables, but only 2 run concurrently
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));

// Results in the following:
// - First timer1 and timer2 will run concurrently
// - timer1 will emit a value every 1000ms for 10 iterations
// - timer2 will emit a value every 2000ms for 6 iterations
// - after timer1 hits it's max iteration, timer2 will
//   continue, and timer3 will start to run concurrently with timer2
// - when timer2 hits it's max iteration it terminates, and
//   timer3 will continue to emit a value every 500ms until it is complete

See:

public static never(): Observable source

Creates an Observable that emits no items to the Observer.

An Observable that never emits anything.

This static operator is useful for creating a simple Observable that emits neither values nor errors nor the completion notification. It can be used for testing purposes or for composing with other Observables. Please not that by never emitting a complete notification, this Observable keeps the subscription from being disposed automatically. Subscriptions need to be manually disposed.

Return:

Observable

A "never" Observable: never emits anything.

Example:

Emit the number 7, then never emit anything else (not even complete).
function info() {
  console.log('Will not be called');
}
var result = Rx.Observable.never().startWith(7);
result.subscribe(x => console.log(x), info, info);

Test:

See:

public static of(values: ...T, scheduler: Scheduler): Observable<T> source

Creates an Observable that emits some values you specify as arguments, immediately one after the other, and then emits a complete notification.

Emits the arguments you provide, then completes.

This static operator is useful for creating a simple Observable that only emits the arguments given, and the complete notification thereafter. It can be used for composing with other Observables, such as with concat. By default, it uses a null IScheduler, which means the next notifications are sent synchronously, although with a different IScheduler it is possible to determine when those notifications will be delivered.

Params:

NameTypeAttributeDescription
values ...T

Arguments that represent next values to be emitted.

scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emissions of the next notifications.

Return:

Observable<T>

An Observable that emits each given input value.

Example:

Emit 10, 20, 30, then 'a', 'b', 'c', then start ticking every second.
var numbers = Rx.Observable.of(10, 20, 30);
var letters = Rx.Observable.of('a', 'b', 'c');
var interval = Rx.Observable.interval(1000);
var result = numbers.concat(letters).concat(interval);
result.subscribe(x => console.log(x));

Test:

See:

public static range(start: number, count: number, scheduler: Scheduler): Observable source

Creates an Observable that emits a sequence of numbers within a specified range.

Emits a sequence of numbers in a range.

range operator emits a range of sequential integers, in order, where you select the start of the range and its length. By default, uses no IScheduler and just delivers the notifications synchronously, but may use an optional IScheduler to regulate those deliveries.

Params:

NameTypeAttributeDescription
start number
  • optional
  • default: 0

The value of the first integer in the sequence.

count number
  • optional
  • default: 0

The number of sequential integers to generate.

scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emissions of the notifications.

Return:

Observable

An Observable of numbers that emits a finite range of sequential integers.

Example:

Emits the numbers 1 to 10
var numbers = Rx.Observable.range(1, 10);
numbers.subscribe(x => console.log(x));

Test:

See:

public static throw(error: any, scheduler: Scheduler): Observable source

Creates an Observable that emits no items to the Observer and immediately emits an error notification.

Just emits 'error', and nothing else.

This static operator is useful for creating a simple Observable that only emits the error notification. It can be used for composing with other Observables, such as in a mergeMap.

Params:

NameTypeAttributeDescription
error any

The particular Error to pass to the error notification.

scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emission of the error notification.

Return:

Observable

An error Observable: emits only the error notification using the given error argument.

Example:

Emit the number 7, then emit an error.
var result = Rx.Observable.throw(new Error('oops!')).startWith(7);
result.subscribe(x => console.log(x), e => console.error(e));
Map and flatten numbers to the sequence 'a', 'b', 'c', but throw an error for 13
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
  x === 13 ?
    Rx.Observable.throw('Thirteens are bad') :
    Rx.Observable.of('a', 'b', 'c')
);
result.subscribe(x => console.log(x), e => console.error(e));

Test:

See:

public static timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable source

Creates an Observable that starts emitting after an initialDelay and emits ever increasing numbers after each period of time thereafter.

Its like interval, but you can specify when should the emissions start.

timer returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time, period of your choosing between those emissions. The first emission happens after the specified initialDelay. The initial delay may be a Date. By default, this operator uses the async IScheduler to provide a notion of time, but you may pass any IScheduler to it. If period is not specified, the output Observable emits only one value, 0. Otherwise, it emits an infinite sequence.

Params:

NameTypeAttributeDescription
initialDelay number | Date

The initial delay time to wait before emitting the first value of 0.

period number
  • optional

The period of time between emissions of the subsequent numbers.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for scheduling the emission of values, and providing a notion of "time".

Return:

Observable

An Observable that emits a 0 after the initialDelay and ever increasing numbers after each period of time thereafter.

Example:

Emits ascending numbers, one every second (1000ms), starting after 3 seconds
var numbers = Rx.Observable.timer(3000, 1000);
numbers.subscribe(x => console.log(x));
Emits one number after five seconds
var numbers = Rx.Observable.timer(5000);
numbers.subscribe(x => console.log(x));

Test:

See:

public static webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject source

Wrapper around the w3c-compatible WebSocket object provided by the browser.

Params:

NameTypeAttributeDescription
urlConfigOrSource string | WebSocketSubjectConfig

the source of the websocket as an url or a structure defining the websocket object

Example:

Wraps browser WebSocket

let subject = Observable.webSocket('ws://localhost:8081');
subject.subscribe(
   (msg) => console.log('message received: ' + msg),
   (err) => console.log(err),
   () => console.log('complete')
 );
subject.next(JSON.stringify({ op: 'hello' }));
Wraps WebSocket from nodejs-websocket (using node.js)

import { w3cwebsocket } from 'websocket';

let socket = new WebSocketSubject({
  url: 'ws://localhost:8081',
  WebSocketCtor: w3cwebsocket
});

let subject = Observable.webSocket('ws://localhost:8081');
subject.subscribe(
   (msg) => console.log('message received: ' + msg),
   (err) => console.log(err),
   () => console.log('complete')
 );
subject.next(JSON.stringify({ op: 'hello' }));

Test:

public static zip(observables: *): Observable<R> source

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

If the latest parameter is a function, this function is used to compute the created value from the input values. Otherwise, an array of the input values is returned.

Params:

NameTypeAttributeDescription
observables *

Return:

Observable<R>

Example:

Combine age and name from different sources

let age$ = Observable.of<number>(27, 25, 29);
let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of<boolean>(true, true, false);

Observable
    .zip(age$,
         name$,
         isDev$,
         (age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
    .subscribe(x => console.log(x));

// outputs
// { age: 27, name: 'Foo', isDev: true }
// { age: 25, name: 'Bar', isDev: true }
// { age: 29, name: 'Beer', isDev: false }

Test:

Public Constructors

public constructor(subscribe: Function) source

Params:

NameTypeAttributeDescription
subscribe Function

the function that is called when the Observable is initially subscribed to. This function is given a Subscriber, to which new values can be nexted, or an error method can be called to raise an error, or complete can be called to notify of a successful completion.

Public Methods

public [$$observable](): Observable source

An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable

Return:

Observable

this instance of the observable

public audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> source

Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.

It's like auditTime, but the silencing duration is determined by a second Observable.

audit is similar to throttle, but emits the last value from the silenced time window, instead of the first value. audit emits the most recent value from the source Observable on the output Observable as soon as its internal timer becomes disabled, and ignores source values while the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, the timer is enabled by calling the durationSelector function with the source value, which returns the "duration" Observable. When the duration Observable emits a value or completes, the timer is disabled, then the most recent source value is emitted on the output Observable, and this process repeats for the next source value.

Params:

NameTypeAttributeDescription
durationSelector function(value: T): SubscribableOrPromise

A function that receives a value from the source Observable, for computing the silencing duration, returned as an Observable or a Promise.

Return:

Observable<T>

An Observable that performs rate-limiting of emissions from the source Observable.

Example:

Emit clicks at a rate of at most one click per second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.audit(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public auditTime(duration: number, scheduler: Scheduler): Observable<T> source

Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

When it sees a source values, it ignores that plus the next ones for duration milliseconds, and then it emits the most recent value from the source.

auditTime is similar to throttleTime, but emits the last value from the silenced time window, instead of the first value. auditTime emits the most recent value from the source Observable on the output Observable as soon as its internal timer becomes disabled, and ignores source values while the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, the timer is enabled. After duration milliseconds (or the time unit determined internally by the optional scheduler) has passed, the timer is disabled, then the most recent source value is emitted on the output Observable, and this process repeats for the next source value. Optionally takes a IScheduler for managing timers.

Params:

NameTypeAttributeDescription
duration number

Time to wait before emitting the most recent source value, measured in milliseconds or the time unit determined internally by the optional scheduler.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the rate-limiting behavior.

Return:

Observable<T>

An Observable that performs rate-limiting of emissions from the source Observable.

Example:

Emit clicks at a rate of at most one click per second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.auditTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public buffer(closingNotifier: Observable<any>): Observable<T[]> source

Buffers the source Observable values until closingNotifier emits.

Collects values from the past as an array, and emits that array only when another Observable emits.

Buffers the incoming Observable values until the given closingNotifier Observable emits a value, at which point it emits the buffer on the output Observable and starts a new buffer internally, awaiting the next time closingNotifier emits.

Params:

NameTypeAttributeDescription
closingNotifier Observable<any>

An Observable that signals the buffer to be emitted on the output Observable.

Return:

Observable<T[]>

An Observable of buffers, which are arrays of values.

Example:

On every click, emit array of most recent interval events
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var buffered = interval.buffer(clicks);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> source

Buffers the source Observable values until the size hits the maximum bufferSize given.

Collects values from the past as an array, and emits that array only when its size reaches bufferSize.

Buffers a number of values from the source Observable by bufferSize then emits the buffer and clears it, and starts a new buffer each startBufferEvery values. If startBufferEvery is not provided or is null, then new buffers are started immediately at the start of the source and when each buffer closes and is emitted.

Params:

NameTypeAttributeDescription
bufferSize number

The maximum size of the buffer emitted.

startBufferEvery number
  • optional

Interval at which to start a new buffer. For example if startBufferEvery is 2, then a new buffer will be started on every other value from the source. A new buffer is started at the beginning of the source by default.

Return:

Observable<T[]>

An Observable of arrays of buffered values.

Example:

Emit the last two click events as an array
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));
On every click, emit the last two click events as an array
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2, 1);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> source

Buffers the source Observable values for a specific time period.

Collects values from the past as an array, and emits those arrays periodically in time.

Buffers values from the source for a specific time duration bufferTimeSpan. Unless the optional argument bufferCreationInterval is given, it emits and resets the buffer every bufferTimeSpan milliseconds. If bufferCreationInterval is given, this operator opens the buffer every bufferCreationInterval milliseconds and closes (emits and resets) the buffer every bufferTimeSpan milliseconds. When the optional argument maxBufferSize is specified, the buffer will be closed either after bufferTimeSpan milliseconds or when it contains maxBufferSize elements.

Params:

NameTypeAttributeDescription
bufferTimeSpan number

The amount of time to fill each buffer array.

bufferCreationInterval number
  • optional

The interval at which to start new buffers.

maxBufferSize number
  • optional

The maximum buffer size.

scheduler Scheduler
  • optional
  • default: async

The scheduler on which to schedule the intervals that determine buffer boundaries.

Return:

Observable<T[]>

An observable of arrays of buffered values.

Example:

Every second, emit an array of the recent click events
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));
Every 5 seconds, emit the click events from the next 2 seconds
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(2000, 5000);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> source

Buffers the source Observable values starting from an emission from openings and ending when the output of closingSelector emits.

Collects values from the past as an array. Starts collecting only when opening emits, and calls the closingSelector function to get an Observable that tells when to close the buffer.

Buffers values from the source by opening the buffer via signals from an Observable provided to openings, and closing and sending the buffers when a Subscribable or Promise returned by the closingSelector function emits.

Params:

NameTypeAttributeDescription
openings SubscribableOrPromise<O>

A Subscribable or Promise of notifications to start new buffers.

closingSelector function(value: O): SubscribableOrPromise

A function that takes the value emitted by the openings observable and returns a Subscribable or Promise, which, when it emits, signals that the associated buffer should be emitted and cleared.

Return:

Observable<T[]>

An observable of arrays of buffered values.

Example:

Every other second, emit the click events from the next 500ms
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var buffered = clicks.bufferToggle(openings, i =>
  i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferWhen(closingSelector: function(): Observable): Observable<T[]> source

Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.

Collects values from the past as an array. When it starts collecting values, it calls a function that returns an Observable that tells when to close the buffer and restart collecting.

Opens a buffer immediately, then closes the buffer when the observable returned by calling closingSelector function emits a value. When it closes the buffer, it immediately opens a new buffer and repeats the process.

Params:

NameTypeAttributeDescription
closingSelector function(): Observable

A function that takes no arguments and returns an Observable that signals buffer closure.

Return:

Observable<T[]>

An observable of arrays of buffered values.

Example:

Emit an array of the last clicks every [1-5] random seconds
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferWhen(() =>
  Rx.Observable.interval(1000 + Math.random() * 4000)
);
buffered.subscribe(x => console.log(x));

Test:

See:

public catch(selector: function): Observable source

Catches errors on the observable to be handled by returning a new observable or throwing an error.

Params:

NameTypeAttributeDescription
selector function

a function that takes as arguments err, which is the error, and caught, which is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable is returned by the selector will be used to continue the observable chain.

Return:

Observable

An observable that originates from either the source or the observable returned by the catch selector function.

Example:

Continues with a different Observable when there's an error

Observable.of(1, 2, 3, 4, 5)
  .map(n => {
	   if (n == 4) {
	     throw 'four!';
    }
   return n;
  })
  .catch(err => Observable.of('I', 'II', 'III', 'IV', 'V'))
  .subscribe(x => console.log(x));
  // 1, 2, 3, I, II, III, IV, V
Retries the caught source Observable again in case of error, similar to retry() operator

Observable.of(1, 2, 3, 4, 5)
  .map(n => {
	   if (n === 4) {
	     throw 'four!';
    }
	   return n;
  })
  .catch((err, caught) => caught)
  .take(30)
  .subscribe(x => console.log(x));
  // 1, 2, 3, 1, 2, 3, ...
Throws a new error when the source Observable throws an error

Observable.of(1, 2, 3, 4, 5)
  .map(n => {
    if (n == 4) {
      throw 'four!';
    }
    return n;
  })
  .catch(err => {
    throw 'error in source. Details: ' + err;
  })
  .subscribe(
    x => console.log(x),
    err => console.log(err)
  );
  // 1, 2, 3, error in source. Details: four!

Test:

public combineAll(project: function): Observable source

Converts a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest.

Flattens an Observable-of-Observables by applying combineLatest when the Observable-of-Observables completes.

Takes an Observable of Observables, and collects all Observables from it. Once the outer Observable completes, it subscribes to all collected Observables and combines their values using the combineLatest strategy, such that:

  • Every time an inner Observable emits, the output Observable emits.
  • When the returned observable emits, it emits all of the latest values by:
    • If a project function is provided, it is called with each recent value from each inner Observable in whatever order they arrived, and the result of the project function is what is emitted by the output Observable.
    • If there is no project function, an array of all of the most recent values is emitted by the output Observable.

Params:

NameTypeAttributeDescription
project function
  • optional

An optional function to map the most recent values from each inner Observable into a new result. Takes each of the most recent values from each collected inner Observable as arguments, in order.

Return:

Observable

An Observable of projected results or arrays of recent values.

Example:

Map two click events to a finite interval Observable, then apply combineAll
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev =>
  Rx.Observable.interval(Math.random()*2000).take(3)
).take(2);
var result = higherOrder.combineAll();
result.subscribe(x => console.log(x));

Test:

See:

public combineLatest(other: ObservableInput, project: function): Observable source

Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.

combineLatest combines the values from this Observable with values from Observables passed as arguments. This is done by subscribing to each Observable, in order, and collecting an array of each of the most recent values any time any of the input Observables emits, then either taking that array and passing it as arguments to an optional project function and emitting the return value of that, or just emitting the array of recent values directly if there is no project function.

Params:

NameTypeAttributeDescription
other ObservableInput

An input Observable to combine with the source Observable. More than one input Observables may be given as argument.

project function
  • optional

An optional function to project the values from the combined latest values into a new value on the output Observable.

Return:

Observable

An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable.

Example:

Dynamically calculate the Body-Mass Index from an Observable of weight and one for height
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = weight.combineLatest(height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));

// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222

See:

public concat(other: ObservableInput, scheduler: Scheduler): Observable source

Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable.

Concatenates multiple Observables together by sequentially emitting their values, one Observable after the other.

Joins this Observable with multiple other Observables by subscribing to them one at a time, starting with the source, and merging their results into the output Observable. Will wait for each Observable to complete before moving on to the next.

Params:

NameTypeAttributeDescription
other ObservableInput

An input Observable to concatenate after the source Observable. More than one input Observables may be given as argument.

scheduler Scheduler
  • optional
  • default: null

An optional IScheduler to schedule each Observable subscription on.

Return:

Observable

All values of each passed Observable merged into a single Observable, in order, in serial fashion.

Example:

Concatenate a timer counting from 0 to 3 with a synchronous sequence from 1 to 10
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = timer.concat(sequence);
result.subscribe(x => console.log(x));

// results in:
// 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
Concatenate 3 Observables
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = timer1.concat(timer2, timer3);
result.subscribe(x => console.log(x));

// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9

Test:

See:

public concatAll(): Observable source

Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.

Flattens an Observable-of-Observables by putting one inner Observable after the other.

Joins every Observable emitted by the source (a higher-order Observable), in a serial fashion. It subscribes to each inner Observable only after the previous inner Observable has completed, and merges all of their values into the returned observable.

Warning: If the source Observable emits Observables quickly and endlessly, and the inner Observables it emits generally complete slower than the source emits, you can run into memory issues as the incoming Observables collect in an unbounded buffer.

Note: concatAll is equivalent to mergeAll with concurrency parameter set to 1.

Return:

Observable

An Observable emitting values from all the inner Observables concatenated.

Example:

For each click event, tick every second from 0 to 3, with no concurrency
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(4));
var firstOrder = higherOrder.concatAll();
firstOrder.subscribe(x => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

Maps each value to an Observable, then flattens all of these inner Observables using concatAll.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an (so-called "inner") Observable. Each new inner Observable is concatenated with the previous inner Observable.

Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.

Note: concatMap is equivalent to mergeMap with concurrency parameter set to 1.

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): ObservableInput

A function that, when applied to an item emitted by the source Observable, returns an Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An Observable that emits the result of applying the projection function (and the optional resultSelector) to each item emitted by the source Observable and taking values from each projected inner Observable sequentially.

Example:

For each click event, tick every second from 0 to 3, with no concurrency
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable.

It's like concatMap, but maps each value always to the same inner Observable.

Maps each source value to the given Observable innerObservable regardless of the source value, and then flattens those resulting Observables into one single Observable, which is the output Observable. Each new innerObservable instance emitted on the output Observable is concatenated with the previous innerObservable instance.

Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.

Note: concatMapTo is equivalent to mergeMapTo with concurrency parameter set to 1.

Params:

NameTypeAttributeDescription
innerObservable ObservableInput

An Observable to replace each value from the source Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An observable of values merged together by joining the passed observable with itself, one after the other, for each value emitted from the source.

Example:

For each click event, tick every second from 0 to 3, with no concurrency
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable source

Counts the number of emissions on the source and emits that number when the source completes.

Tells how many values were emitted, when the source completes.

count transforms an Observable that emits values into an Observable that emits a single value that represents the number of values emitted by the source Observable. If the source Observable terminates with an error, count will pass this error notification along without emitting a value first. If the source Observable does not terminate at all, count will neither emit a value nor terminate. This operator takes an optional predicate function as argument, in which case the output emission will represent the number of source values that matched true with the predicate.

Params:

NameTypeAttributeDescription
predicate function(value: T, i: number, source: Observable<T>): boolean
  • optional

A boolean function to select what values are to be counted. It is provided with arguments of:

  • value: the value from the source Observable.
  • index: the (zero-based) "index" of the value from the source Observable.
  • source: the source Observable instance itself.

Return:

Observable

An Observable of one number that represents the count as described above.

Example:

Counts how many seconds have passed before the first click happened
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var secondsBeforeClick = seconds.takeUntil(clicks);
var result = secondsBeforeClick.count();
result.subscribe(x => console.log(x));
Counts how many odd numbers are there between 1 and 7
var numbers = Rx.Observable.range(1, 7);
var result = numbers.count(i => i % 2 === 1);
result.subscribe(x => console.log(x));

// Results in:
// 4

Test:

See:

public debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable source

Emits a value from the source Observable only after a particular time span determined by another Observable has passed without another source emission.

It's like debounceTime, but the time span of emission silence is determined by a second Observable.

debounce delays values emitted by the source Observable, but drops previous pending delayed emissions if a new value arrives on the source Observable. This operator keeps track of the most recent value from the source Observable, and spawns a duration Observable by calling the durationSelector function. The value is emitted only when the duration Observable emits a value or completes, and if no other value was emitted on the source Observable since the duration Observable was spawned. If a new value appears before the duration Observable emits, the previous value will be dropped and will not be emitted on the output Observable.

Like debounceTime, this is a rate-limiting operator, and also a delay-like operator since output emissions do not necessarily occur at the same time as they did on the source Observable.

Params:

NameTypeAttributeDescription
durationSelector function(value: T): SubscribableOrPromise

A function that receives a value from the source Observable, for computing the timeout duration for each source value, returned as an Observable or a Promise.

Return:

Observable

An Observable that delays the emissions of the source Observable by the specified duration Observable returned by durationSelector, and may drop some values if they occur too frequently.

Example:

Emit the most recent click after a burst of clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public debounceTime(dueTime: number, scheduler: Scheduler): Observable source

Emits a value from the source Observable only after a particular time span has passed without another source emission.

It's like delay, but passes only the most recent value from each burst of emissions.

debounceTime delays values emitted by the source Observable, but drops previous pending delayed emissions if a new value arrives on the source Observable. This operator keeps track of the most recent value from the source Observable, and emits that only when dueTime enough time has passed without any other value appearing on the source Observable. If a new value appears before dueTime silence occurs, the previous value will be dropped and will not be emitted on the output Observable.

This is a rate-limiting operator, because it is impossible for more than one value to be emitted in any time window of duration dueTime, but it is also a delay-like operator since output emissions do not occur at the same time as they did on the source Observable. Optionally takes a IScheduler for managing timers.

Params:

NameTypeAttributeDescription
dueTime number

The timeout duration in milliseconds (or the time unit determined internally by the optional scheduler) for the window of time required to wait for emission silence before emitting the most recent source value.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the timeout for each value.

Return:

Observable

An Observable that delays the emissions of the source Observable by the specified dueTime, and may drop some values if they occur too frequently.

Example:

Emit the most recent click after a burst of clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public defaultIfEmpty(defaultValue: any): Observable source

Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

If the source Observable turns out to be empty, then this operator will emit a default value.

defaultIfEmpty emits the values emitted by the source Observable or a specified default value if the source Observable is empty (completes without having emitted any next value).

Params:

NameTypeAttributeDescription
defaultValue any
  • optional
  • default: null

The default value used if the source Observable is empty.

Return:

Observable

An Observable that emits either the specified defaultValue if the source Observable emits no items, or the values emitted by the source Observable.

Example:

If no clicks happen in 5 seconds, then emit "no clicks"
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
var result = clicksBeforeFive.defaultIfEmpty('no clicks');
result.subscribe(x => console.log(x));

Test:

See:

public delay(delay: number | Date, scheduler: Scheduler): Observable source

Delays the emission of items from the source Observable by a given timeout or until a given Date.

Time shifts each item by some specified amount of milliseconds.

If the delay argument is a Number, this operator time shifts the source Observable by that amount of time expressed in milliseconds. The relative time intervals between the values are preserved.

If the delay argument is a Date, this operator time shifts the start of the Observable execution until the given date occurs.

Params:

NameTypeAttributeDescription
delay number | Date

The delay duration in milliseconds (a number) or a Date until which the emission of the source items is delayed.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the time-shift for each item.

Return:

Observable

An Observable that delays the emissions of the source Observable by the specified timeout or Date.

Example:

Delay each click by one second
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));
Delay all clicks until a future date happens
var clicks = Rx.Observable.fromEvent(document, 'click');
var date = new Date('March 15, 2050 12:00:00'); // in the future
var delayedClicks = clicks.delay(date); // click emitted only after that date
delayedClicks.subscribe(x => console.log(x));

Test:

See:

public delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable source

Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.

It's like delay, but the time span of the delay duration is determined by a second Observable.

delayWhen time shifts each emitted value from the source Observable by a time span determined by another Observable. When the source emits a value, the delayDurationSelector function is called with the source value as argument, and should return an Observable, called the "duration" Observable. The source value is emitted on the output Observable only when the duration Observable emits a value or completes.

Optionally, delayWhen takes a second argument, subscriptionDelay, which is an Observable. When subscriptionDelay emits its first value or completes, the source Observable is subscribed to and starts behaving like described in the previous paragraph. If subscriptionDelay is not provided, delayWhen will subscribe to the source Observable as soon as the output Observable is subscribed.

Params:

NameTypeAttributeDescription
delayDurationSelector function(value: T): Observable

A function that returns an Observable for each value emitted by the source Observable, which is then used to delay the emission of that item on the output Observable until the Observable returned from this function emits a value.

subscriptionDelay Observable

An Observable that triggers the subscription to the source Observable once it emits any value.

Return:

Observable

An Observable that delays the emissions of the source Observable by an amount of time specified by the Observable returned by delayDurationSelector.

Example:

Delay each click by a random amount of time, between 0 and 5 seconds
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delayWhen(event =>
  Rx.Observable.interval(Math.random() * 5000)
);
delayedClicks.subscribe(x => console.log(x));

Test:

See:

public dematerialize(): Observable source

Converts an Observable of Notification objects into the emissions that they represent.

Unwraps Notification objects as actual next, error and complete emissions. The opposite of materialize.

dematerialize is assumed to operate an Observable that only emits Notification objects as next emissions, and does not emit any error. Such Observable is the output of a materialize operation. Those notifications are then unwrapped using the metadata they contain, and emitted as next, error, and complete on the output Observable.

Use this operator in conjunction with materialize.

Return:

Observable

An Observable that emits items and notifications embedded in Notification objects emitted by the source Observable.

Example:

Convert an Observable of Notifications to an actual Observable
var notifA = new Rx.Notification('N', 'A');
var notifB = new Rx.Notification('N', 'B');
var notifE = new Rx.Notification('E', void 0,
  new TypeError('x.toUpperCase is not a function')
);
var materialized = Rx.Observable.of(notifA, notifB, notifE);
var upperCase = materialized.dematerialize();
upperCase.subscribe(x => console.log(x), e => console.error(e));

// Results in:
// A
// B
// TypeError: x.toUpperCase is not a function

Test:

See:

public distinct(keySelector: function, flushes: Observable): Observable source

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

If a keySelector function is provided, then it will project each value from the source observable into a new value that it will check for equality with previously projected values. If a keySelector function is not provided, it will use each value from the source observable directly with an equality check against previous values.

In JavaScript runtimes that support Set, this operator will use a Set to improve performance of the distinct value checking.

In other runtimes, this operator will use a minimal implementation of Set that relies on an Array and indexOf under the hood, so performance will degrade as more values are checked for distinction. Even in newer browsers, a long-running distinct use might result in memory leaks. To help alleviate this in some scenarios, an optional flushes parameter is also provided so that the internal Set can be "flushed", basically clearing it of values.

Params:

NameTypeAttributeDescription
keySelector function
  • optional

Optional function to select which value you want to check as distinct.

flushes Observable
  • optional

Optional Observable for flushing the internal HashSet of the operator.

Return:

Observable

An Observable that emits items from the source Observable with distinct values.

Example:

A simple example with numbers
Observable.of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
  .distinct()
  .subscribe(x => console.log(x)); // 1, 2, 3, 4
An example using a keySelector function
interface Person {
   age: number,
   name: string
}

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'})
    .distinct((p: Person) => p.name)
    .subscribe(x => console.log(x));

// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }

Test:

See:

public distinctUntilChanged(compare: function): Observable source

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.

If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.

If a comparator function is not provided, an equality check is used by default.

Params:

NameTypeAttributeDescription
compare function
  • optional

Optional comparison function called to test if an item is distinct from the previous item in the source.

Return:

Observable

An Observable that emits items from the source Observable with distinct values.

Example:

A simple example with numbers
Observable.of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
  .distinctUntilChanged()
  .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
An example using a compare function
interface Person {
   age: number,
   name: string
}

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'})
    { age: 6, name: 'Foo'})
    .distinctUntilChanged((p: Person, q: Person) => p.name === q.name)
    .subscribe(x => console.log(x));

// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }

Test:

See:

public distinctUntilKeyChanged(key: string, compare: function): Observable source

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct.

If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.

If a comparator function is not provided, an equality check is used by default.

Params:

NameTypeAttributeDescription
key string

String key for object property lookup on each item.

compare function
  • optional

Optional comparison function called to test if an item is distinct from the previous item in the source.

Return:

Observable

An Observable that emits items from the source Observable with distinct values based on the key specified.

Example:

An example comparing the name of persons

 interface Person {
    age: number,
    name: string
 }

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'},
    { age: 6, name: 'Foo'})
    .distinctUntilKeyChanged('name')
    .subscribe(x => console.log(x));

// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
An example comparing the first letters of the name

interface Person {
    age: number,
    name: string
 }

Observable.of<Person>(
    { age: 4, name: 'Foo1'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo2'},
    { age: 6, name: 'Foo3'})
    .distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3))
    .subscribe(x => console.log(x));

// displays:
// { age: 4, name: 'Foo1' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo2' }

Test:

See:

public do(nextOrObserver: Observer | function, error: function, complete: function): Observable source

Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source.

Intercepts each emission on the source and runs a function, but returns an output which is identical to the source.

Returns a mirrored Observable of the source Observable, but modified so that the provided Observer is called to perform a side effect for every value, error, and completion emitted by the source. Any errors that are thrown in the aforementioned Observer or handlers are safely sent down the error path of the output Observable.

This operator is useful for debugging your Observables for the correct values or performing other side effects.

Note: this is different to a subscribe on the Observable. If the Observable returned by do is not subscribed, the side effects specified by the Observer will never happen. do therefore simply spies on existing execution, it does not trigger an execution to happen like subscribe does.

Params:

NameTypeAttributeDescription
nextOrObserver Observer | function
  • optional

A normal Observer object or a callback for next.

error function
  • optional

Callback for errors in the source.

complete function
  • optional

Callback for the completion of the source.

Return:

Observable

An Observable identical to the source, but runs the specified Observer or callback(s) for each item.

Example:

Map every every click to the clientX position of that click, while also logging the click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks
  .do(ev => console.log(ev))
  .map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

See:

public elementAt(index: number, defaultValue: T): Observable source

Emits the single value at the specified index in a sequence of emissions from the source Observable.

Emits only the i-th value, then completes.

elementAt returns an Observable that emits the item at the specified index in the source Observable, or a default value if that index is out of range and the default argument is provided. If the default argument is not given and the index is out of range, the output Observable will emit an ArgumentOutOfRangeError error.

Params:

NameTypeAttributeDescription
index number

Is the number i for the i-th source emission that has happened since the subscription, starting from the number 0.

defaultValue T
  • optional

The default value returned for missing indices.

Return:

Observable

An Observable that emits a single item, if it is found. Otherwise, will emit the default value if given. If not, then emits an error.

Throw:

ArgumentOutOfRangeError

When using elementAt(i), it delivers an ArgumentOutOrRangeError to the Observer's error callback if i < 0 or the Observable has completed before emitting the i-th next notification.

Example:

Emit only the third click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.elementAt(2);
result.subscribe(x => console.log(x));

// Results in:
// click 1 = nothing
// click 2 = nothing
// click 3 = MouseEvent object logged to console

Test:

See:

public every(predicate: function, thisArg: any): Observable source

Returns an Observable that emits whether or not every item of the source satisfies the condition specified.

Params:

NameTypeAttributeDescription
predicate function

A function for determining if an item meets a specified condition.

thisArg any
  • optional

Optional object to use for this in the callback.

Return:

Observable

An Observable of booleans that determines if all items of the source Observable meet the condition specified.

Example:

A simple example emitting true if all elements are less than 5, false otherwise
 Observable.of(1, 2, 3, 4, 5, 6)
    .every(x => x < 5)
    .subscribe(x => console.log(x)); // -> false

Test:

public exhaust(): Observable source

Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.

Flattens an Observable-of-Observables by dropping the next inner Observables while the current inner is still executing.

exhaust subscribes to an Observable that emits Observables, also known as a higher-order Observable. Each time it observes one of these emitted inner Observables, the output Observable begins emitting the items emitted by that inner Observable. So far, it behaves like mergeAll. However, exhaust ignores every new inner Observable if the previous Observable has not yet completed. Once that one completes, it will accept and flatten the next inner Observable and repeat this process.

Return:

Observable

An Observable that takes a source of Observables and propagates the first observable exclusively until it completes before subscribing to the next.

Example:

Run a finite timer for each click, only if there is no currently active timer
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var result = higherOrder.exhaust();
result.subscribe(x => console.log(x));

Test:

See:

public exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

Maps each value to an Observable, then flattens all of these inner Observables using exhaust.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an (so-called "inner") Observable. When it projects a source value to an Observable, the output Observable begins emitting the items emitted by that projected Observable. However, exhaustMap ignores every new projected Observable if the previous projected Observable has not yet completed. Once that one completes, it will accept and flatten the next projected Observable and repeat this process.

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): ObservableInput

A function that, when applied to an item emitted by the source Observable, returns an Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An Observable containing projected Observables of each item of the source, ignoring projected Observables that start before their preceding Observable has completed.

Example:

Run a finite timer for each click, only if there is no currently active timer
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable source

Recursively projects each source value to an Observable which is merged in the output Observable.

It's similar to mergeMap, but applies the projection function to every source value as well as every output value. It's recursive.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger. Expand will re-emit on the output Observable every source value. Then, each output value is given to the project function which returns an inner Observable to be merged on the output Observable. Those output values resulting from the projection are also given to the project function to produce new output values. This is how expand behaves recursively.

Params:

NameTypeAttributeDescription
project function(value: T, index: number)

A function that, when applied to an item emitted by the source or the output Observable, returns an Observable.

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

scheduler Scheduler
  • optional
  • default: null

The IScheduler to use for subscribing to each projected inner Observable.

Return:

Observable

An Observable that emits the source values and also result of applying the projection function to each value emitted on the output Observable and and merging the results of the Observables obtained from this transformation.

Example:

Start emitting the powers of two on every click, at most 10 of them
var clicks = Rx.Observable.fromEvent(document, 'click');
var powersOfTwo = clicks
  .mapTo(1)
  .expand(x => Rx.Observable.of(2 * x).delay(1000))
  .take(10);
powersOfTwo.subscribe(x => console.log(x));

Test:

See:

public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable source

Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.

Like Array.prototype.filter(), it only emits a value from the source if it passes a criterion function.

Similar to the well-known Array.prototype.filter method, this operator takes values from the source Observable, passes them through a predicate function and only emits those values that yielded true.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

A function that evaluates each value emitted by the source Observable. If it returns true, the value is emitted, if false the value is not passed to the output Observable. The index parameter is the number i for the i-th source emission that has happened since the subscription, starting from the number 0.

thisArg any
  • optional

An optional argument to determine the value of this in the predicate function.

Return:

Observable

An Observable of values from the source that were allowed by the predicate function.

Example:

Emit only click events whose target was a DIV element
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksOnDivs = clicks.filter(ev => ev.target.tagName === 'DIV');
clicksOnDivs.subscribe(x => console.log(x));

Test:

See:

public find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> source

Emits only the first value emitted by the source Observable that meets some condition.

Finds the first value that passes some test and emits that.

find searches for the first item in the source Observable that matches the specified condition embodied by the predicate, and returns the first occurrence in the source. Unlike first, the predicate is required in find, and does not emit an error if a valid value is not found.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean

A function called with each item to test for condition matching.

thisArg any
  • optional

An optional argument to determine the value of this in the predicate function.

Return:

Observable<T>

An Observable of the first item that matches the condition.

Example:

Find and emit the first click that happens on a DIV element
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable source

Emits only the index of the first value emitted by the source Observable that meets some condition.

It's like find, but emits the index of the found value, not the value itself.

findIndex searches for the first item in the source Observable that matches the specified condition embodied by the predicate, and returns the (zero-based) index of the first occurrence in the source. Unlike first, the predicate is required in findIndex, and does not emit an error if a valid value is not found.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean

A function called with each item to test for condition matching.

thisArg any
  • optional

An optional argument to determine the value of this in the predicate function.

Return:

Observable

An Observable of the index of the first item that matches the condition.

Example:

Emit the index of first click that happens on a DIV element
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.findIndex(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> source

Emits only the first value (or the first value that meets some condition) emitted by the source Observable.

Emits only the first value. Or emits only the first value that passes some test.

If called with no arguments, first emits the first value of the source Observable, then completes. If called with a predicate function, first emits the first value of the source that matches the specified condition. It may also take a resultSelector function to produce the output value from the input value, and a defaultValue to emit in case the source completes before it is able to emit a valid value. Throws an error if defaultValue was not provided and a matching element is not found.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean
  • optional

An optional function called with each item to test for condition matching.

resultSelector function(value: T, index: number): R
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source Observable. The arguments passed to this function are:

  • value: the value that was emitted on the source.
  • index: the "index" of the value from the source.
defaultValue R
  • optional

The default value emitted in case no valid value was found on the source.

Return:

Observable<T | R>

An Observable of the first item that matches the condition.

Throw:

EmptyError

Delivers an EmptyError to the Observer's error callback if the Observable completes before any next notification was sent.

Example:

Emit only the first click that happens on the DOM
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first();
result.subscribe(x => console.log(x));
Emits the first click that happens on a DIV
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public forEach(next: Function, PromiseCtor: PromiseConstructor): Promise source

Params:

NameTypeAttributeDescription
next Function

a handler for each value emitted by the observable

PromiseCtor PromiseConstructor
  • optional

a constructor function used to instantiate the Promise

Return:

Promise

a promise that either resolves on observable completion or rejects with the handled error

public groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> source

Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.

Params:

NameTypeAttributeDescription
keySelector function(value: T): K

A function that extracts the key for each item.

elementSelector function(value: T): R
  • optional

A function that extracts the return element for each item.

durationSelector function(grouped: GroupedObservable<K, R>): Observable<any>
  • optional

A function that returns an Observable to determine how long each group should exist.

Return:

Observable<GroupedObservable<K, R>>

An Observable that emits GroupedObservables, each of which corresponds to a unique key value and each of which emits those items from the source Observable that share that key value.

Test:

public ignoreElements(): Observable source

Ignores all items emitted by the source Observable and only passes calls of complete or error.

Return:

Observable

An empty Observable that only calls complete or error, based on which one is called by the source Observable.

Test:

public isEmpty(): Observable source

If the source Observable is empty it returns an Observable that emits true, otherwise it emits false.

Return:

Observable

An Observable that emits a Boolean.

Test:

public last(predicate: function): Observable source

Returns an Observable that emits only the last item emitted by the source Observable. It optionally takes a predicate function as a parameter, in which case, rather than emitting the last item from the source Observable, the resulting Observable will emit the last item from the source Observable that satisfies the predicate.

Params:

NameTypeAttributeDescription
predicate function

The condition any source emitted item has to satisfy.

Return:

Observable

An Observable that emits only the last item satisfying the given condition from the source, or an NoSuchElementException if no such items are emitted.

Throw:

EmptyError

Delivers an EmptyError to the Observer's error callback if the Observable completes before any next notification was sent.

*

Throws if no items that match the predicate are emitted by the source Observable.

Test:

public letProto(func: *): Observable<R> source

Params:

NameTypeAttributeDescription
func *

Return:

Observable<R>

public lift(operator: Operator): Observable source

Creates a new Observable, with this Observable as the source, and the passed operator defined as the new observable's operator.

Params:

NameTypeAttributeDescription
operator Operator

the operator defining the operation to take on the observable

Return:

Observable

a new observable with the Operator applied

public map(project: function(value: T, index: number): R, thisArg: any): Observable<R> source

Applies a given project function to each value emitted by the source Observable, and emits the resulting values as an Observable.

Like Array.prototype.map(), it passes each source value through a transformation function to get corresponding output values.

Similar to the well known Array.prototype.map function, this operator applies a projection to each value and emits that projection in the output Observable.

Params:

NameTypeAttributeDescription
project function(value: T, index: number): R

The function to apply to each value emitted by the source Observable. The index parameter is the number i for the i-th emission that has happened since the subscription, starting from the number 0.

thisArg any
  • optional

An optional argument to define what this is in the project function.

Return:

Observable<R>

An Observable that emits the values from the source Observable transformed by the given project function.

Example:

Map every every click to the clientX position of that click
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

Test:

See:

public mapTo(value: any): Observable source

Emits the given constant value on the output Observable every time the source Observable emits a value.

Like map, but it maps every source value to the same output value every time.

Takes a constant value as argument, and emits that whenever the source Observable emits a value. In other words, ignores the actual source value, and simply uses the emission moment to know when to emit the given value.

Params:

NameTypeAttributeDescription
value any

The value to map each source value to.

Return:

Observable

An Observable that emits the given value every time the source Observable emits something.

Example:

Map every every click to the string 'Hi'
var clicks = Rx.Observable.fromEvent(document, 'click');
var greetings = clicks.mapTo('Hi');
greetings.subscribe(x => console.log(x));

Test:

See:

public materialize(): Observable<Notification<T>> source

Represents all of the notifications from the source Observable as next emissions marked with their original types within Notification objects.

Wraps next, error and complete emissions in Notification objects, emitted as next on the output Observable.

materialize returns an Observable that emits a next notification for each next, error, or complete emission of the source Observable. When the source Observable emits complete, the output Observable will emit next as a Notification of type "complete", and then it will emit complete as well. When the source Observable emits error, the output will emit next as a Notification of type "error", and then complete.

This operator is useful for producing metadata of the source Observable, to be consumed as next emissions. Use it in conjunction with dematerialize.

Return:

Observable<Notification<T>>

An Observable that emits Notification objects that wrap the original emissions from the source Observable with metadata.

Example:

Convert a faulty Observable to an Observable of Notifications
var letters = Rx.Observable.of('a', 'b', 13, 'd');
var upperCase = letters.map(x => x.toUpperCase());
var materialized = upperCase.materialize();
materialized.subscribe(x => console.log(x));

// Results in the following:
// - Notification {kind: "N", value: "A", error: undefined, hasValue: true}
// - Notification {kind: "N", value: "B", error: undefined, hasValue: true}
// - Notification {kind: "E", value: undefined, error: TypeError:
//   x.toUpperCase is not a function at MapSubscriber.letters.map.x
//   [as project] (http://1…, hasValue: false}

Test:

See:

public max(comparer: Function): Observable source

The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.

Params:

NameTypeAttributeDescription
comparer Function
  • optional

Optional comparer function that it will use instead of its default to compare the value of two items.

Return:

Observable

An Observable that emits item with the largest value.

Example:

Get the maximal value of a series of numbers
Rx.Observable.of(5, 4, 7, 2, 8)
  .max()
  .subscribe(x => console.log(x)); // -> 8
Use a comparer function to get the maximal item
interface Person {
  age: number,
  name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
                      {age: 5, name: 'Bar'},
                      {age: 9, name: 'Beer'})
          .max<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
          .subscribe((x: Person) => console.log(x.name)); // -> 'Beer'
}

Test:

See:

public merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable source

Creates an output Observable which concurrently emits all values from every given input Observable.

Flattens multiple Observables together by blending their values into one Observable.

merge subscribes to each given input Observable (either the source or an Observable given as argument), and simply forwards (without doing any transformation) all the values from all the input Observables to the output Observable. The output Observable only completes once all input Observables have completed. Any error delivered by an input Observable will be immediately emitted on the output Observable.

Params:

NameTypeAttributeDescription
other ObservableInput

An input Observable to merge with the source Observable. More than one input Observables may be given as argument.

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

scheduler Scheduler
  • optional
  • default: null

The IScheduler to use for managing concurrency of input Observables.

Return:

Observable

An Observable that emits items that are the result of every input Observable.

Example:

Merge together two Observables: 1s interval and clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = clicks.merge(timer);
clicksOrTimer.subscribe(x => console.log(x));
Merge together 3 Observables, but only 2 run concurrently
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = timer1.merge(timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));

Test:

See:

public mergeAll(concurrent: number): Observable source

Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

Flattens an Observable-of-Observables.

mergeAll subscribes to an Observable that emits Observables, also known as a higher-order Observable. Each time it observes one of these emitted inner Observables, it subscribes to that and delivers all the values from the inner Observable on the output Observable. The output Observable only completes once all inner Observables have completed. Any error delivered by a inner Observable will be immediately emitted on the output Observable.

Params:

NameTypeAttributeDescription
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of inner Observables being subscribed to concurrently.

Return:

Observable

An Observable that emits values coming from all the inner Observables emitted by the source Observable.

Example:

Spawn a new interval Observable for each click event, and blend their outputs as one Observable
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var firstOrder = higherOrder.mergeAll();
firstOrder.subscribe(x => console.log(x));
Count from 0 to 9 every second for each click, but only allow 2 concurrent timers
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(10));
var firstOrder = higherOrder.mergeAll(2);
firstOrder.subscribe(x => console.log(x));

Test:

See:

public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source

Projects each source value to an Observable which is merged in the output Observable.

Maps each value to an Observable, then flattens all of these inner Observables using mergeAll.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): ObservableInput

A function that, when applied to an item emitted by the source Observable, returns an Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

Return:

Observable

An Observable that emits the result of applying the projection function (and the optional resultSelector) to each item emitted by the source Observable and merging the results of the Observables obtained from this transformation.

Example:

Map and flatten each letter to an Observable ticking every 1 second
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
  Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));

// Results in the following:
// a0
// b0
// c0
// a1
// b1
// c1
// continues to list a,b,c with respective ascending integers

Test:

See:

public mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source

Projects each source value to the same Observable which is merged multiple times in the output Observable.

It's like mergeMap, but maps each value always to the same inner Observable.

Maps each source value to the given Observable innerObservable regardless of the source value, and then merges those resulting Observables into one single Observable, which is the output Observable.

Params:

NameTypeAttributeDescription
innerObservable ObservableInput

An Observable to replace each value from the source Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

Return:

Observable

An Observable that emits items from the given innerObservable (and optionally transformed through resultSelector) every time a value is emitted on the source Observable.

Example:

For each click event, start an interval Observable ticking every 1 second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.mergeMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R> source

Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.

It's like scan, but the Observables returned by the accumulator are merged into the outer Observable.

Params:

NameTypeAttributeDescription
accumulator function(acc: R, value: T): Observable<R>

The accumulator function called on each source value.

seed *

The initial accumulation value.

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

Maximum number of input Observables being subscribed to concurrently.

Return:

Observable<R>

An observable of the accumulated values.

Example:

Count the number of click events
const click$ = Rx.Observable.fromEvent(document, 'click');
const one$ = click$.mapTo(1);
const seed = 0;
const count$ = one$.mergeScan((acc, one) => Rx.Observable.of(acc + one), seed);
count$.subscribe(x => console.log(x));

// Results:
1
2
3
4
// ...and so on for each click

Test:

public min(comparer: Function): Observable<R> source

The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.

Params:

NameTypeAttributeDescription
comparer Function
  • optional

Optional comparer function that it will use instead of its default to compare the value of two items.

Return:

Observable<R>

An Observable that emits item with the smallest value.

Example:

Get the minimal value of a series of numbers
Rx.Observable.of(5, 4, 7, 2, 8)
  .min()
  .subscribe(x => console.log(x)); // -> 2
Use a comparer function to get the minimal item
interface Person {
  age: number,
  name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
                      {age: 5, name: 'Bar'},
                      {age: 9, name: 'Beer'})
          .min<Person>( (a: Person, b: Person) => a.age < b.age ? -1 : 1)
          .subscribe((x: Person) => console.log(x.name)); // -> 'Bar'
}

Test:

See:

public multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable source

Returns an Observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.

Params:

NameTypeAttributeDescription
subjectOrSubjectFactory Function | Subject

Factory function to create an intermediate subject through which the source sequence's elements will be multicast to the selector function or Subject to push source elements into.

selector Function
  • optional

Optional selector function that can use the multicasted source stream as many times as needed, without causing multiple subscriptions to the source stream. Subscribers to the given source will receive all notifications of the source from the time of the subscription forward.

Return:

Observable

An Observable that emits the results of invoking the selector on the items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.

Test:

public observeOn(scheduler: *, delay: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
scheduler *
delay *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

See:

public pairwise(): Observable<Array<T>> source

Groups pairs of consecutive emissions together and emits them as an array of two values.

Puts the current value and previous value together as an array, and emits that.

The Nth emission from the source Observable will cause the output Observable to emit an array [(N-1)th, Nth] of the previous and the current value, as a pair. For this reason, pairwise emits on the second and subsequent emissions from the source Observable, but not on the first emission, because there is no previous value in that case.

Return:

Observable<Array<T>>

An Observable of pairs (as arrays) of consecutive values from the source Observable.

Example:

On every click (starting from the second), emit the relative distance to the previous click
var clicks = Rx.Observable.fromEvent(document, 'click');
var pairs = clicks.pairwise();
var distance = pairs.map(pair => {
  var x0 = pair[0].clientX;
  var y0 = pair[0].clientY;
  var x1 = pair[1].clientX;
  var y1 = pair[1].clientY;
  return Math.sqrt(Math.pow(x0 - x1, 2) + Math.pow(y0 - y1, 2));
});
distance.subscribe(x => console.log(x));

Test:

See:

public partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>] source

Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate.

It's like filter, but returns two Observables: one like the output of filter, and the other with values that did not pass the condition.

partition outputs an array with two Observables that partition the values from the source Observable through the given predicate function. The first Observable in that array emits source values for which the predicate argument returns true. The second Observable emits source values for which the predicate returns false. The first behaves like filter and the second behaves like filter with the predicate negated.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

A function that evaluates each value emitted by the source Observable. If it returns true, the value is emitted on the first Observable in the returned array, if false the value is emitted on the second Observable in the array. The index parameter is the number i for the i-th source emission that has happened since the subscription, starting from the number 0.

thisArg any
  • optional

An optional argument to determine the value of this in the predicate function.

Return:

[Observable<T>, Observable<T>]

An array with two Observables: one with values that passed the predicate, and another with values that did not pass the predicate.

Example:

Partition click events into those on DIV elements and those elsewhere
var clicks = Rx.Observable.fromEvent(document, 'click');
var parts = clicks.partition(ev => ev.target.tagName === 'DIV');
var clicksOnDivs = parts[0];
var clicksElsewhere = parts[1];
clicksOnDivs.subscribe(x => console.log('DIV clicked: ', x));
clicksElsewhere.subscribe(x => console.log('Other clicked: ', x));

Test:

See:

public pluck(properties: ...string): Observable source

Maps each source value (an object) to its specified nested property.

Like map, but meant only for picking one of the nested properties of every emitted object.

Given a list of strings describing a path to an object property, retrieves the value of a specified nested property from all values in the source Observable. If a property can't be resolved, it will return undefined for that value.

Params:

NameTypeAttributeDescription
properties ...string

The nested properties to pluck from each source value (an object).

Return:

Observable

A new Observable of property values from the source values.

Example:

Map every every click to the tagName of the clicked target element
var clicks = Rx.Observable.fromEvent(document, 'click');
var tagNames = clicks.pluck('target', 'tagName');
tagNames.subscribe(x => console.log(x));

Test:

See:

public publish(selector: Function): * source

Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it.

Params:

NameTypeAttributeDescription
selector Function
  • optional

Optional selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all notifications of the source from the time of the subscription on.

Return:

*

A ConnectableObservable that upon connection causes the source Observable to emit items to its Observers.

Test:

public publishBehavior(value: *): ConnectableObservable<T> source

Params:

NameTypeAttributeDescription
value *

Test:

public publishReplay(bufferSize: *, windowTime: *, scheduler: *): ConnectableObservable<T> source

Params:

NameTypeAttributeDescription
bufferSize *
windowTime *
scheduler *

Test:

public race(): Observable source

Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.

Params:

NameTypeAttributeDescription
...observables ...Observables

Sources used to race for which Observable emits first.

Return:

Observable

An Observable that mirrors the output of the first Observable to emit an item.

Test:

public reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R> source

Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.

Combines together all values emitted on the source, using an accumulator function that knows how to join a new source value into the accumulation from the past.

Like Array.prototype.reduce(), reduce applies an accumulator function against an accumulation and each value of the source Observable (from the past) to reduce it to a single value, emitted on the output Observable. Note that reduce will only emit one value, only when the source Observable completes. It is equivalent to applying operator scan followed by operator last.

Returns an Observable that applies a specified accumulator function to each item emitted by the source Observable. If a seed value is specified, then that value will be used as the initial value for the accumulator. If no seed value is specified, the first item of the source is used as the seed.

Params:

NameTypeAttributeDescription
accumulator function(acc: R, value: T, index: number): R

The accumulator function called on each source value.

seed R
  • optional

The initial accumulation value.

Return:

Observable<R>

An Observable that emits a single value that is the result of accumulating the values emitted by the source Observable.

Example:

Count the number of click events that happened in 5 seconds
var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click')
  .takeUntil(Rx.Observable.interval(5000));
var ones = clicksInFiveSeconds.mapTo(1);
var seed = 0;
var count = ones.reduce((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));

Test:

See:

public repeat(count: number): Observable source

Returns an Observable that repeats the stream of items emitted by the source Observable at most count times.

Params:

NameTypeAttributeDescription
count number
  • optional

The number of times the source Observable items are repeated, a count of 0 will yield an empty Observable.

Return:

Observable

An Observable that repeats the stream of items emitted by the source Observable at most count times.

Test:

public repeatWhen(notifier: function(notifications: Observable): Observable): Observable source

Returns an Observable that mirrors the source Observable with the exception of a complete. If the source Observable calls complete, this method will emit to the Observable returned from notifier. If that Observable calls complete or error, then this method will call complete or error on the child subscription. Otherwise this method will resubscribe to the source Observable.

Params:

NameTypeAttributeDescription
notifier function(notifications: Observable): Observable

Receives an Observable of notifications with which a user can complete or error, aborting the repetition.

Return:

Observable

The source Observable modified with repeat logic.

Test:

public retry(count: number): Observable source

Returns an Observable that mirrors the source Observable with the exception of an error. If the source Observable calls error, this method will resubscribe to the source Observable for a maximum of count resubscriptions (given as a number parameter) rather than propagating the error call.

Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second time and emits: [1, 2, 3, 4, 5] then the complete stream of emissions and notifications would be: [1, 2, 1, 2, 3, 4, 5, complete].

Params:

NameTypeAttributeDescription
count number

Number of retry attempts before failing.

Return:

Observable

The source Observable modified with the retry logic.

Test:

public retryWhen(notifier: function(errors: Observable): Observable): Observable source

Returns an Observable that mirrors the source Observable with the exception of an error. If the source Observable calls error, this method will emit the Throwable that caused the error to the Observable returned from notifier. If that Observable calls complete or error then this method will call complete or error on the child subscription. Otherwise this method will resubscribe to the source Observable.

Params:

NameTypeAttributeDescription
notifier function(errors: Observable): Observable

Receives an Observable of notifications with which a user can complete or error, aborting the retry.

Return:

Observable

The source Observable modified with retry logic.

Test:

public sample(notifier: Observable<any>): Observable<T> source

Emits the most recently emitted value from the source Observable whenever another Observable, the notifier, emits.

It's like sampleTime, but samples whenever the notifier Observable emits something.

Whenever the notifier Observable emits a value or completes, sample looks at the source Observable and emits whichever value it has most recently emitted since the previous sampling, unless the source has not emitted anything since the previous sampling. The notifier is subscribed to as soon as the output Observable is subscribed.

Params:

NameTypeAttributeDescription
notifier Observable<any>

The Observable to use for sampling the source Observable.

Return:

Observable<T>

An Observable that emits the results of sampling the values emitted by the source Observable whenever the notifier Observable emits value or completes.

Example:

On every click, sample the most recent "seconds" timer
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = seconds.sample(clicks);
result.subscribe(x => console.log(x));

Test:

See:

public sampleTime(period: number, scheduler: Scheduler): Observable<T> source

Emits the most recently emitted value from the source Observable within periodic time intervals.

Samples the source Observable at periodic time intervals, emitting what it samples.

sampleTime periodically looks at the source Observable and emits whichever value it has most recently emitted since the previous sampling, unless the source has not emitted anything since the previous sampling. The sampling happens periodically in time every period milliseconds (or the time unit defined by the optional scheduler argument). The sampling starts as soon as the output Observable is subscribed.

Params:

NameTypeAttributeDescription
period number

The sampling period expressed in milliseconds or the time unit determined internally by the optional scheduler.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the sampling.

Return:

Observable<T>

An Observable that emits the results of sampling the values emitted by the source Observable at the specified time interval.

Example:

Every second, emit the most recent click at most once
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.sampleTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> source

Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.

It's like reduce, but emits the current accumulation whenever the source emits a value.

Combines together all values emitted on the source, using an accumulator function that knows how to join a new source value into the accumulation from the past. Is similar to reduce, but emits the intermediate accumulations.

Returns an Observable that applies a specified accumulator function to each item emitted by the source Observable. If a seed value is specified, then that value will be used as the initial value for the accumulator. If no seed value is specified, the first item of the source is used as the seed.

Params:

NameTypeAttributeDescription
accumulator function(acc: R, value: T, index: number): R

The accumulator function called on each source value.

seed T | R
  • optional

The initial accumulation value.

Return:

Observable<R>

An observable of the accumulated values.

Example:

Count the number of click events
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));

Test:

See:

public sequenceEqual(compareTo: Observable, comparor: function): Observable source

Compares all values of two observables in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal.

Checks to see of all values emitted by both observables are equal, in order.

sequenceEqual subscribes to two observables and buffers incoming values from each observable. Whenever either observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom up; If any value pair doesn't match, the returned observable will emit false and complete. If one of the observables completes, the operator will wait for the other observable to complete; If the other observable emits before completing, the returned observable will emit false and complete. If one observable never completes or emits after the other complets, the returned observable will never complete.

Params:

NameTypeAttributeDescription
compareTo Observable

The observable sequence to compare the source sequence to.

comparor function
  • optional

An optional function to compare each value pair

Return:

Observable

An Observable of a single boolean value representing whether or not the values emitted by both observables were equal in sequence.

Example:

figure out if the Konami code matches
var code = Rx.Observable.from([
 "ArrowUp",
 "ArrowUp",
 "ArrowDown",
 "ArrowDown",
 "ArrowLeft",
 "ArrowRight",
 "ArrowLeft",
 "ArrowRight",
 "KeyB",
 "KeyA",
 "Enter" // no start key, clearly.
]);

var keys = Rx.Observable.fromEvent(document, 'keyup')
 .map(e => e.code);
var matches = keys.bufferCount(11, 1)
 .mergeMap(
   last11 =>
     Rx.Observable.from(last11)
       .sequenceEqual(code)
  );
matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));

Test:

See:

public share(): Observable<T> source

Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot. This is an alias for .publish().refCount().

Return:

Observable<T>

An Observable that upon connection causes the source Observable to emit items to its Observers.

Test:

public single(predicate: Function): Observable<T> source

Returns an Observable that emits the single item emitted by the source Observable that matches a specified predicate, if that Observable emits one such item. If the source Observable emits more than one such item or no such items, notify of an IllegalArgumentException or NoSuchElementException respectively.

Params:

NameTypeAttributeDescription
predicate Function

A predicate function to evaluate items emitted by the source Observable.

Return:

Observable<T>

An Observable that emits the single item emitted by the source Observable that matches the predicate. .

Throw:

EmptyError

Delivers an EmptyError to the Observer's error callback if the Observable completes before any next notification was sent.

Test:

public skip(count: Number): Observable source

Returns an Observable that skips the first count items emitted by the source Observable.

Params:

NameTypeAttributeDescription
count Number

The number of times, items emitted by source Observable should be skipped.

Return:

Observable

An Observable that skips values emitted by the source Observable.

Test:

public skipUntil(notifier: Observable): Observable<T> source

Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

Params:

NameTypeAttributeDescription
notifier Observable

The second Observable that has to emit an item before the source Observable's elements begin to be mirrored by the resulting Observable.

Return:

Observable<T>

An Observable that skips items from the source Observable until the second Observable emits an item, then emits the remaining items.

Test:

public skipWhile(predicate: Function): Observable<T> source

Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.

Params:

NameTypeAttributeDescription
predicate Function

A function to test each item emitted from the source Observable.

Return:

Observable<T>

An Observable that begins emitting items emitted by the source Observable when the specified predicate becomes false.

Test:

public startWith(values: ...T, scheduler: Scheduler): Observable source

Returns an Observable that emits the items you specify as arguments before it begins to emit items emitted by the source Observable.

Params:

NameTypeAttributeDescription
values ...T

Items you want the modified Observable to emit first.

scheduler Scheduler
  • optional

A IScheduler to use for scheduling the emissions of the next notifications.

Return:

Observable

An Observable that emits the items in the specified Iterable and then emits the items emitted by the source Observable.

Test:

public subscribeOn(scheduler: Scheduler): Observable<T> source

Asynchronously subscribes Observers to this Observable on the specified IScheduler.

Params:

NameTypeAttributeDescription
scheduler Scheduler

The IScheduler to perform subscription actions on.

Return:

Observable<T>

The source Observable modified so that its subscriptions happen on the specified IScheduler. .

Test:

public switch(): Observable<T> source

Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.

Flattens an Observable-of-Observables by dropping the previous inner Observable once a new one appears.

switch subscribes to an Observable that emits Observables, also known as a higher-order Observable. Each time it observes one of these emitted inner Observables, the output Observable subscribes to the inner Observable and begins emitting the items emitted by that. So far, it behaves like mergeAll. However, when a new inner Observable is emitted, switch unsubscribes from the earlier-emitted inner Observable and subscribes to the new inner Observable and begins emitting items from it. It continues to behave like this for subsequent inner Observables.

Return:

Observable<T>

An Observable that emits the items emitted by the Observable most recently emitted by the source Observable.

Example:

Rerun an interval Observable on every click event
var clicks = Rx.Observable.fromEvent(document, 'click');
// Each click event is mapped to an Observable that ticks every second
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var switched = higherOrder.switch();
// The outcome is that `switched` is essentially a timer that restarts
// on every click. The interval Observables from older clicks do not merge
// with the current interval Observable.
switched.subscribe(x => console.log(x));

Test:

See:

public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.

Maps each value to an Observable, then flattens all of these inner Observables using switch.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an (so-called "inner") Observable. Each time it observes one of these inner Observables, the output Observable begins emitting the items emitted by that inner Observable. When a new inner Observable is emitted, switchMap stops emitting items from the earlier-emitted inner Observable and begins emitting items from the new one. It continues to behave like this for subsequent inner Observables.

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): ObservableInput

A function that, when applied to an item emitted by the source Observable, returns an Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An Observable that emits the result of applying the projection function (and the optional resultSelector) to each item emitted by the source Observable and taking only the values from the most recently projected inner Observable.

Example:

Rerun an interval Observable on every click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMap((ev) => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

Projects each source value to the same Observable which is flattened multiple times with switch in the output Observable.

It's like switchMap, but maps each value always to the same inner Observable.

Maps each source value to the given Observable innerObservable regardless of the source value, and then flattens those resulting Observables into one single Observable, which is the output Observable. The output Observables emits values only from the most recently emitted instance of innerObservable.

Params:

NameTypeAttributeDescription
innerObservable ObservableInput

An Observable to replace each value from the source Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:

  • outerValue: the value that came from the source
  • innerValue: the value that came from the projected Observable
  • outerIndex: the "index" of the value that came from the source
  • innerIndex: the "index" of the value from the projected Observable

Return:

Observable

An Observable that emits items from the given innerObservable (and optionally transformed through resultSelector) every time a value is emitted on the source Observable, and taking only the values from the most recently projected inner Observable.

Example:

Rerun an interval Observable on every click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public take(count: number): Observable<T> source

Emits only the first count values emitted by the source Observable.

Takes the first count values from the source, then completes.

take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.

Params:

NameTypeAttributeDescription
count number

The maximum number of next values to emit.

Return:

Observable<T>

An Observable that emits only the first count values emitted by the source Observable, or all of the values from the source if the source emits fewer than count values.

Throw:

ArgumentOutOfRangeError

When using take(i), it delivers an ArgumentOutOrRangeError to the Observer's error callback if i < 0.

Example:

Take the first 5 seconds of an infinite 1-second interval Observable
var interval = Rx.Observable.interval(1000);
var five = interval.take(5);
five.subscribe(x => console.log(x));

Test:

See:

public takeLast(count: number): Observable<T> source

Emits only the last count values emitted by the source Observable.

Remembers the latest count values, then emits those only when the source completes.

takeLast returns an Observable that emits at most the last count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. This operator must wait until the complete notification emission from the source in order to emit the next values on the output Observable, because otherwise it is impossible to know whether or not more values will be emitted on the source. For this reason, all values are emitted synchronously, followed by the complete notification.

Params:

NameTypeAttributeDescription
count number

The maximum number of values to emit from the end of the sequence of values emitted by the source Observable.

Return:

Observable<T>

An Observable that emits at most the last count values emitted by the source Observable.

Throw:

ArgumentOutOfRangeError

When using takeLast(i), it delivers an ArgumentOutOrRangeError to the Observer's error callback if i < 0.

Example:

Take the last 3 values of an Observable with many values
var many = Rx.Observable.range(1, 100);
var lastThree = many.takeLast(3);
lastThree.subscribe(x => console.log(x));

Test:

See:

public takeUntil(notifier: Observable): Observable<T> source

Emits the values emitted by the source Observable until a notifier Observable emits a value.

Lets values pass until a second Observable, notifier, emits something. Then, it completes.

takeUntil subscribes and begins mirroring the source Observable. It also monitors a second Observable, notifier that you provide. If the notifier emits a value or a complete notification, the output Observable stops mirroring the source Observable and completes.

Params:

NameTypeAttributeDescription
notifier Observable

The Observable whose first emitted value will cause the output Observable of takeUntil to stop emitting values from the source Observable.

Return:

Observable<T>

An Observable that emits the values from the source Observable until such time as notifier emits its first value.

Example:

Tick every second until the first click happens
var interval = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = interval.takeUntil(clicks);
result.subscribe(x => console.log(x));

Test:

See:

public takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> source

Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.

Takes values from the source only while they pass the condition given. When the first value does not satisfy, it completes.

takeWhile subscribes and begins mirroring the source Observable. Each value emitted on the source is given to the predicate function which returns a boolean, representing a condition to be satisfied by the source values. The output Observable emits the source values until such time as the predicate returns false, at which point takeWhile stops mirroring the source Observable and completes the output Observable.

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

A function that evaluates a value emitted by the source Observable and returns a boolean. Also takes the (zero-based) index as the second argument.

Return:

Observable<T>

An Observable that emits the values from the source Observable so long as each value satisfies the condition defined by the predicate, then completes.

Example:

Emit click events only while the clientX property is greater than 200
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.takeWhile(ev => ev.clientX > 200);
result.subscribe(x => console.log(x));

Test:

See:

public throttle(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> source

Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.

It's like throttleTime, but the silencing duration is determined by a second Observable.

throttle emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, it is forwarded to the output Observable, and then the timer is enabled by calling the durationSelector function with the source value, which returns the "duration" Observable. When the duration Observable emits a value or completes, the timer is disabled, and this process repeats for the next source value.

Params:

NameTypeAttributeDescription
durationSelector function(value: T): SubscribableOrPromise

A function that receives a value from the source Observable, for computing the silencing duration for each source value, returned as an Observable or a Promise.

Return:

Observable<T>

An Observable that performs the throttle operation to limit the rate of emissions from the source.

Example:

Emit clicks at a rate of at most one click per second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttle(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public throttleTime(duration: number, scheduler: Scheduler): Observable<T> source

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

Lets a value pass, then ignores source values for the next duration milliseconds.

throttleTime emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, it is forwarded to the output Observable, and then the timer is enabled. After duration milliseconds (or the time unit determined internally by the optional scheduler) has passed, the timer is disabled, and this process repeats for the next source value. Optionally takes a IScheduler for managing timers.

Params:

NameTypeAttributeDescription
duration number

Time to wait before emitting another value after emitting the last value, measured in milliseconds or the time unit determined internally by the optional scheduler.

scheduler Scheduler
  • optional
  • default: async

The IScheduler to use for managing the timers that handle the sampling.

Return:

Observable<T>

An Observable that performs the throttle operation to limit the rate of emissions from the source.

Example:

Emit clicks at a rate of at most one click per second
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttleTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
scheduler *

Return:

Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T>

Test:

public timeout(due: number, scheduler: Scheduler): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
due number
scheduler Scheduler
  • optional

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

public timeoutWith(due: *, withObservable: *, scheduler: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
due *
withObservable *
scheduler *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

public timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
scheduler *

Return:

Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T>

Test:

public toArray(): Observable<any[]> | WebSocketSubject<T> | Observable<T> source

Return:

Observable<any[]> | WebSocketSubject<T> | Observable<T>

Test:

public toPromise(PromiseCtor: *): Promise<T> source

Converts an Observable sequence to a ES2015 compliant promise.

Params:

NameTypeAttributeDescription
PromiseCtor *

promise The constructor of the promise. If not provided, it will look for a constructor first in Rx.config.Promise then fall back to the native Promise constructor if available.

Return:

Promise<T>

An ES2015 compatible promise with the last value from the observable sequence.

Example:

// Using normal ES2015
let source = Rx.Observable
  .just(42)
  .toPromise();

source.then((value) => console.log('Value: %s', value));
// => Value: 42

// Rejected Promise
// Using normal ES2015
let source = Rx.Observable
  .throw(new Error('woops'))
  .toPromise();

source
  .then((value) => console.log('Value: %s', value))
  .catch((err) => console.log('Error: %s', err));
// => Error: Error: woops

// Setting via the config
Rx.config.Promise = RSVP.Promise;

let source = Rx.Observable
  .of(42)
  .toPromise();

source.then((value) => console.log('Value: %s', value));
// => Value: 42

// Setting via the method
let source = Rx.Observable
  .just(42)
  .toPromise(RSVP.Promise);

source.then((value) => console.log('Value: %s', value));
// => Value: 42

Test:

public window(windowBoundaries: Observable<any>): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable whenever windowBoundaries emits.

It's like buffer, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits connected, non-overlapping windows. It emits the current window and opens a new one whenever the Observable windowBoundaries emits an item. Because each window is an Observable, the output is a higher-order Observable.

Params:

NameTypeAttributeDescription
windowBoundaries Observable<any>

An Observable that completes the previous window and starts a new window.

Return:

Observable<Observable<T>>

An Observable of windows, which are Observables emitting values of the source Observable.

Example:

In every window of 1 second each, emit at most 2 click events
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var result = clicks.window(interval)
  .map(win => win.take(2)) // each window has at most 2 emissions
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));

Test:

See:

public windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable with each nested Observable emitting at most windowSize values.

It's like bufferCount, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits windows every startWindowEvery items, each containing no more than windowSize items. When the source Observable completes or encounters an error, the output Observable emits the current window and propagates the notification from the source Observable. If startWindowEvery is not provided, then new windows are started immediately at the start of the source and when each window completes with size windowSize.

Params:

NameTypeAttributeDescription
windowSize number

The maximum number of values emitted by each window.

startWindowEvery number
  • optional

Interval at which to start a new window. For example if startWindowEvery is 2, then a new window will be started on every other value from the source. A new window is started at the beginning of the source by default.

Return:

Observable<Observable<T>>

An Observable of windows, which in turn are Observable of values.

Example:

Ignore every 3rd click event, starting from the first one
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(3)
  .map(win => win.skip(1)) // skip first of every 3 clicks
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
Ignore every 3rd click event, starting from the third one
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(2, 3)
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));

Test:

See:

public windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable starting from an emission from openings and ending when the output of closingSelector emits.

It's like bufferToggle, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits windows that contain those items emitted by the source Observable between the time when the openings Observable emits an item and when the Observable returned by closingSelector emits an item.

Params:

NameTypeAttributeDescription
openings Observable<O>

An observable of notifications to start new windows.

closingSelector function(value: O): Observable

A function that takes the value emitted by the openings observable and returns an Observable, which, when it emits (either next or complete), signals that the associated window should complete.

Return:

Observable<Observable<T>>

An observable of windows, which in turn are Observables.

Example:

Every other second, emit the click events from the next 500ms
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var result = clicks.windowToggle(openings, i =>
  i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
).mergeAll();
result.subscribe(x => console.log(x));

Test:

See:

public windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> source

Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.

It's like bufferWhen, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits connected, non-overlapping windows. It emits the current window and opens a new one whenever the Observable produced by the specified closingSelector function emits an item. The first window is opened immediately when subscribing to the output Observable.

Params:

NameTypeAttributeDescription
closingSelector function(): Observable

A function that takes no arguments and returns an Observable that signals (on either next or complete) when to close the previous window and start a new one.

Return:

Observable<Observable<T>>

An observable of windows, which in turn are Observables.

Example:

Emit only the first two clicks events in every window of [1-5] random seconds
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks
  .windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000))
  .map(win => win.take(2)) // each window has at most 2 emissions
  .mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));

Test:

See:

public withLatestFrom(other: ObservableInput, project: Function): Observable source

Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.

Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.

withLatestFrom combines each value from the source Observable (the instance) with the latest values from the other input Observables only when the source emits a value, optionally using a project function to determine the value to be emitted on the output Observable. All input Observables must emit at least one value before the output Observable will emit a value.

Params:

NameTypeAttributeDescription
other ObservableInput

An input Observable to combine with the source Observable. More than one input Observables may be given as argument.

project Function
  • optional

Projection function for combining values together. Receives all values in order of the Observables passed, where the first parameter is a value from the source Observable. (e.g. a.withLatestFrom(b, c, (a1, b1, c1) => a1 + b1 + c1)). If this is not passed, arrays will be emitted on the output Observable.

Return:

Observable

An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable.

Example:

On every click event, emit an array with the latest timer event plus the click event
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var result = clicks.withLatestFrom(timer);
result.subscribe(x => console.log(x));

Test:

See:

public zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
project *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

public zipProto(observables: *): Observable<R> source

Params:

NameTypeAttributeDescription
observables *

Return:

Observable<R>