Observable
Direct Subclass:
Indirect Subclass:
A representation of any set of values over any amount of time. This is the most basic building block of RxJS.
Static Method Summary
Static Public Methods | ||
public static |
bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable Converts a callback API to a function that returns an Observable. |
|
public static |
bindNodeCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable Converts a Node.js-style callback API to a function that returns an Observable. |
|
public static |
combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, scheduler: Scheduler): Observable Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables. |
|
public static |
concat(input1: ObservableInput, input2: ObservableInput, scheduler: Scheduler): Observable Creates an output Observable which sequentially emits all values from given Observable and then moves on to the next. |
|
public static |
create(onSubscription: function(observer: Observer): TeardownLogic): Observable Creates a new Observable, that will execute the specified function when an Observer subscribes to it. |
|
public static |
defer(observableFactory: function(): SubscribableOrPromise): Observable Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer. |
|
public static |
empty(scheduler: Scheduler): Observable Creates an Observable that emits no items to the Observer and immediately emits a complete notification. |
|
public static |
forkJoin(sources: ...SubscribableOrPromise, project: function): Observable Joins last values emitted by passed Observables. |
|
public static |
from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T> Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object. |
|
public static |
fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T> Creates an Observable that emits events of a specific type coming from the given event target. |
|
public static |
fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T> Creates an Observable from an API based on addHandler/removeHandler functions. |
|
public static |
fromPromise(promise: PromiseLike<T>, scheduler: Scheduler): Observable<T> Converts a Promise to an Observable. |
|
public static |
interval(period: number, scheduler: Scheduler): Observable Creates an Observable that emits sequential numbers every specified interval of time, on a specified IScheduler. |
|
public static |
merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable Creates an output Observable which concurrently emits all values from every given input Observable. |
|
public static |
never(): Observable Creates an Observable that emits no items to the Observer. |
|
public static |
of(values: ...T, scheduler: Scheduler): Observable<T> Creates an Observable that emits some values you specify as arguments, immediately one after the other, and then emits a complete notification. |
|
public static |
range(start: number, count: number, scheduler: Scheduler): Observable Creates an Observable that emits a sequence of numbers within a specified range. |
|
public static |
throw(error: any, scheduler: Scheduler): Observable Creates an Observable that emits no items to the Observer and immediately emits an error notification. |
|
public static |
Creates an Observable that starts emitting after an |
|
public static |
webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject Wrapper around the w3c-compatible WebSocket object provided by the browser. |
|
public static |
zip(observables: *): Observable<R> Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables. |
Constructor Summary
Public Constructor | ||
public |
constructor(subscribe: Function) |
Method Summary
Public Methods | ||
public |
An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable |
|
public |
audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process. |
|
public |
audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process. |
|
public |
auditTime(duration: number, scheduler: Scheduler): Observable<T> Ignores source values for |
|
public |
auditTime(duration: number, scheduler: Scheduler): Observable<T> Ignores source values for |
|
public |
buffer(closingNotifier: Observable<any>): Observable<T[]> Buffers the source Observable values until |
|
public |
buffer(closingNotifier: Observable<any>): Observable<T[]> Buffers the source Observable values until |
|
public |
bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> Buffers the source Observable values until the size hits the maximum
|
|
public |
bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> Buffers the source Observable values until the size hits the maximum
|
|
public |
bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> Buffers the source Observable values for a specific time period. |
|
public |
bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> Buffers the source Observable values for a specific time period. |
|
public |
bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> Buffers the source Observable values starting from an emission from
|
|
public |
bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> Buffers the source Observable values starting from an emission from
|
|
public |
bufferWhen(closingSelector: function(): Observable): Observable<T[]> Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer. |
|
public |
bufferWhen(closingSelector: function(): Observable): Observable<T[]> Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer. |
|
public |
catch(selector: function): Observable Catches errors on the observable to be handled by returning a new observable or throwing an error. |
|
public |
combineAll(project: function): Observable Converts a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest. |
|
public |
combineLatest(other: ObservableInput, project: function): Observable Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables. |
|
public |
combineLatest(other: ObservableInput, project: function): Observable Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables. |
|
public |
concat(other: ObservableInput, scheduler: Scheduler): Observable Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable. |
|
public |
concat(other: ObservableInput, scheduler: Scheduler): Observable Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable. |
|
public |
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order. |
|
public |
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order. |
|
public |
concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next. |
|
public |
concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next. |
|
public |
concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable. |
|
public |
concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable. |
|
public |
count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable Counts the number of emissions on the source and emits that number when the source completes. |
|
public |
count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable Counts the number of emissions on the source and emits that number when the source completes. |
|
public |
debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable Emits a value from the source Observable only after a particular time span determined by another Observable has passed without another source emission. |
|
public |
debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable Emits a value from the source Observable only after a particular time span determined by another Observable has passed without another source emission. |
|
public |
debounceTime(dueTime: number, scheduler: Scheduler): Observable Emits a value from the source Observable only after a particular time span has passed without another source emission. |
|
public |
debounceTime(dueTime: number, scheduler: Scheduler): Observable Emits a value from the source Observable only after a particular time span has passed without another source emission. |
|
public |
defaultIfEmpty(defaultValue: any): Observable Emits a given value if the source Observable completes without emitting any
|
|
public |
defaultIfEmpty(defaultValue: any): Observable Emits a given value if the source Observable completes without emitting any
|
|
public |
delay(delay: number | Date, scheduler: Scheduler): Observable Delays the emission of items from the source Observable by a given timeout or until a given Date. |
|
public |
delay(delay: number | Date, scheduler: Scheduler): Observable Delays the emission of items from the source Observable by a given timeout or until a given Date. |
|
public |
delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable. |
|
public |
delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable. |
|
public |
Converts an Observable of Notification objects into the emissions that they represent. |
|
public |
Converts an Observable of Notification objects into the emissions that they represent. |
|
public |
distinct(keySelector: function, flushes: Observable): Observable Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. |
|
public |
distinct(keySelector: function, flushes: Observable): Observable Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. |
|
public |
distinctUntilChanged(compare: function): Observable Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item. |
|
public |
distinctUntilChanged(compare: function): Observable Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item. |
|
public |
distinctUntilKeyChanged(key: string, compare: function): Observable Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct. |
|
public |
distinctUntilKeyChanged(key: string, compare: function): Observable Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct. |
|
public |
Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source. |
|
public |
elementAt(index: number, defaultValue: T): Observable Emits the single value at the specified |
|
public |
elementAt(index: number, defaultValue: T): Observable Emits the single value at the specified |
|
public |
every(predicate: function, thisArg: any): Observable Returns an Observable that emits whether or not every item of the source satisfies the condition specified. |
|
public |
every(predicate: function, thisArg: any): Observable Returns an Observable that emits whether or not every item of the source satisfies the condition specified. |
|
public |
Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed. |
|
public |
Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed. |
|
public |
exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed. |
|
public |
exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed. |
|
public |
expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable Recursively projects each source value to an Observable which is merged in the output Observable. |
|
public |
expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable Recursively projects each source value to an Observable which is merged in the output Observable. |
|
public |
filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate. |
|
public |
filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate. |
|
public |
finalize(callback: function): Observable Returns an Observable that mirrors the source Observable, but will call a specified function when the source terminates on complete or error. |
|
public |
find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> Emits only the first value emitted by the source Observable that meets some condition. |
|
public |
find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> Emits only the first value emitted by the source Observable that meets some condition. |
|
public |
findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable Emits only the index of the first value emitted by the source Observable that meets some condition. |
|
public |
findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable Emits only the index of the first value emitted by the source Observable that meets some condition. |
|
public |
first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> Emits only the first value (or the first value that meets some condition) emitted by the source Observable. |
|
public |
first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> Emits only the first value (or the first value that meets some condition) emitted by the source Observable. |
|
public |
|
|
public |
groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> Groups the items emitted by an Observable according to a specified criterion,
and emits these grouped items as |
|
public |
groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> Groups the items emitted by an Observable according to a specified criterion,
and emits these grouped items as |
|
public |
Ignores all items emitted by the source Observable and only passes calls of |
|
public |
Ignores all items emitted by the source Observable and only passes calls of |
|
public |
If the source Observable is empty it returns an Observable that emits true, otherwise it emits false. |
|
public |
last(predicate: function): Observable Returns an Observable that emits only the last item emitted by the source Observable. |
|
public |
last(predicate: function): Observable Returns an Observable that emits only the last item emitted by the source Observable. |
|
public |
letProto(func: *): Observable<R> |
|
public |
lift(operator: Operator): Observable Creates a new Observable, with this Observable as the source, and the passed operator defined as the new observable's operator. |
|
public |
map(project: function(value: T, index: number): R, thisArg: any): Observable<R> Applies a given |
|
public |
map(project: function(value: T, index: number): R, thisArg: any): Observable<R> Applies a given |
|
public |
mapTo(value: any): Observable Emits the given constant value on the output Observable every time the source Observable emits a value. |
|
public |
mapTo(value: any): Observable Emits the given constant value on the output Observable every time the source Observable emits a value. |
|
public |
materialize(): Observable<Notification<T>> Represents all of the notifications from the source Observable as |
|
public |
materialize(): Observable<Notification<T>> Represents all of the notifications from the source Observable as |
|
public |
max(comparer: Function): Observable The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value. |
|
public |
max(comparer: Function): Observable The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value. |
|
public |
merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable Creates an output Observable which concurrently emits all values from every given input Observable. |
|
public |
merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable Creates an output Observable which concurrently emits all values from every given input Observable. |
|
public |
mergeAll(concurrent: number): Observable Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables. |
|
public |
mergeAll(concurrent: number): Observable Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables. |
|
public |
mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable Projects each source value to an Observable which is merged in the output Observable. |
|
public |
mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable Projects each source value to an Observable which is merged in the output Observable. |
|
public |
mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable Projects each source value to the same Observable which is merged multiple times in the output Observable. |
|
public |
mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable Projects each source value to the same Observable which is merged multiple times in the output Observable. |
|
public |
mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R> Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable. |
|
public |
mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R> Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable. |
|
public |
min(comparer: Function): Observable<R> The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value. |
|
public |
min(comparer: Function): Observable<R> The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value. |
|
public |
multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable Returns an Observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream. |
|
public |
multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable<T> | ConnectableObservable<T> Allows source Observable to be subscribed only once with a Subject of choice, while still sharing its values between multiple subscribers. |
|
public |
observeOn(scheduler: IScheduler, delay: number): Observable<T> Re-emits all notifications from source Observable with specified scheduler. |
|
public |
observeOn(scheduler: IScheduler, delay: number): Observable<T> Re-emits all notifications from source Observable with specified scheduler. |
|
public |
onErrorResumeNext(observables: ...ObservableInput): Observable When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one that was passed. |
|
public |
onErrorResumeNext(observables: ...ObservableInput): Observable When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one that was passed. |
|
public |
pairwise(): Observable<Array<T>> Groups pairs of consecutive emissions together and emits them as an array of two values. |
|
public |
pairwise(): Observable<Array<T>> Groups pairs of consecutive emissions together and emits them as an array of two values. |
|
public | ||
public | ||
public |
pipe(operations: ...*): Observable Used to stitch together functional operators into a chain. |
|
public |
pluck(properties: ...string): Observable Maps each source value (an object) to its specified nested property. |
|
public |
pluck(properties: ...string): Observable Maps each source value (an object) to its specified nested property. |
|
public |
Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it. |
|
public |
Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it. |
|
public |
publishBehavior(value: *): ConnectableObservable<T> |
|
public |
publishBehavior(value: *): ConnectableObservable<T> |
|
public |
|
|
public |
publishReplay(bufferSize: *, windowTime: *, selectorOrScheduler: *, scheduler: *): Observable<T> | ConnectableObservable<T> |
|
public |
race(): Observable Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables. |
|
public |
race(): Observable Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables. |
|
public |
reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R> Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value. |
|
public |
reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R> Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value. |
|
public |
repeat(count: number): Observable Returns an Observable that repeats the stream of items emitted by the source Observable at most count times. |
|
public |
repeat(count: number): Observable Returns an Observable that repeats the stream of items emitted by the source Observable at most count times. |
|
public |
repeatWhen(notifier: function(notifications: Observable): Observable): Observable Returns an Observable that mirrors the source Observable with the exception of a |
|
public |
repeatWhen(notifier: function(notifications: Observable): Observable): Observable Returns an Observable that mirrors the source Observable with the exception of a |
|
public |
retry(count: number): Observable Returns an Observable that mirrors the source Observable with the exception of an |
|
public |
retry(count: number): Observable Returns an Observable that mirrors the source Observable with the exception of an |
|
public |
retryWhen(notifier: function(errors: Observable): Observable): Observable Returns an Observable that mirrors the source Observable with the exception of an |
|
public |
retryWhen(notifier: function(errors: Observable): Observable): Observable Returns an Observable that mirrors the source Observable with the exception of an |
|
public |
sample(notifier: Observable<any>): Observable<T> Emits the most recently emitted value from the source Observable whenever
another Observable, the |
|
public |
sample(notifier: Observable<any>): Observable<T> Emits the most recently emitted value from the source Observable whenever
another Observable, the |
|
public |
sampleTime(period: number, scheduler: Scheduler): Observable<T> Emits the most recently emitted value from the source Observable within periodic time intervals. |
|
public |
sampleTime(period: number, scheduler: Scheduler): Observable<T> Emits the most recently emitted value from the source Observable within periodic time intervals. |
|
public |
scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value. |
|
public |
scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value. |
|
public |
sequenceEqual(compareTo: Observable, comparor: function): Observable Compares all values of two observables in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal. |
|
public |
sequenceEqual(compareTo: Observable, comparor: function): Observable Compares all values of two observables in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal. |
|
public |
share(): Observable<T> Returns a new Observable that multicasts (shares) the original Observable. |
|
public |
share(): Observable<T> Returns a new Observable that multicasts (shares) the original Observable. |
|
public |
shareReplay(bufferSize: *, windowTime: *, scheduler: *): * |
|
public |
shareReplay(bufferSize: *, windowTime: *, scheduler: *): * |
|
public |
single(predicate: Function): Observable<T> Returns an Observable that emits the single item emitted by the source Observable that matches a specified predicate, if that Observable emits one such item. |
|
public |
single(predicate: Function): Observable<T> Returns an Observable that emits the single item emitted by the source Observable that matches a specified predicate, if that Observable emits one such item. |
|
public |
skip(count: Number): Observable Returns an Observable that skips the first |
|
public |
skip(count: Number): Observable Returns an Observable that skips the first |
|
public |
skipLast(count: number): Observable<T> Skip the last |
|
public |
skipLast(count: number): Observable<T> Skip the last |
|
public |
skipUntil(notifier: Observable): Observable<T> Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. |
|
public |
skipUntil(notifier: Observable): Observable<T> Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. |
|
public |
skipWhile(predicate: Function): Observable<T> Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false. |
|
public |
skipWhile(predicate: Function): Observable<T> Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false. |
|
public |
startWith(values: ...T, scheduler: Scheduler): Observable Returns an Observable that emits the items you specify as arguments before it begins to emit items emitted by the source Observable. |
|
public |
startWith(values: ...T, scheduler: Scheduler): Observable Returns an Observable that emits the items you specify as arguments before it begins to emit items emitted by the source Observable. |
|
public |
Invokes an execution of an Observable and registers Observer handlers for notifications it will emit. |
|
public |
subscribeOn(scheduler: Scheduler): Observable<T> Asynchronously subscribes Observers to this Observable on the specified IScheduler. |
|
public |
subscribeOn(scheduler: Scheduler): Observable<T> Asynchronously subscribes Observers to this Observable on the specified IScheduler. |
|
public |
switch(): Observable<T> Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables. |
|
public |
switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable. |
|
public |
switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable. |
|
public |
switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to the same Observable which is flattened multiple times with switch in the output Observable. |
|
public |
switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable Projects each source value to the same Observable which is flattened multiple times with switch in the output Observable. |
|
public |
take(count: number): Observable<T> Emits only the first |
|
public |
take(count: number): Observable<T> Emits only the first |
|
public |
takeLast(count: number): Observable<T> Emits only the last |
|
public |
takeLast(count: number): Observable<T> Emits only the last |
|
public |
takeUntil(notifier: Observable): Observable<T> Emits the values emitted by the source Observable until a |
|
public |
takeUntil(notifier: Observable): Observable<T> Emits the values emitted by the source Observable until a |
|
public |
takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> Emits values emitted by the source Observable so long as each value satisfies
the given |
|
public |
takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> Emits values emitted by the source Observable so long as each value satisfies
the given |
|
public |
throttle(durationSelector: function(value: T): SubscribableOrPromise, config: Object): Observable<T> Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process. |
|
public |
throttle(durationSelector: function(value: T): SubscribableOrPromise, config: Object): Observable<T> Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process. |
|
public |
throttleTime(duration: number, scheduler: Scheduler): Observable<T> Emits a value from the source Observable, then ignores subsequent source
values for |
|
public |
throttleTime(duration: number, scheduler: Scheduler): Observable<T> Emits a value from the source Observable, then ignores subsequent source
values for |
|
public |
timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T> |
|
public |
timeout(due: number | Date, scheduler: Scheduler): Observable<T> Errors if Observable does not emit a value in given time span. |
|
public |
timeout(due: number | Date, scheduler: Scheduler): Observable<T> Errors if Observable does not emit a value in given time span. |
|
public |
timeoutWith(due: number | Date, withObservable: Observable<T>, scheduler: Scheduler): Observable<T> Errors if Observable does not emit a value in given time span, in case of which subscribes to the second Observable. |
|
public |
timeoutWith(due: number | Date, withObservable: Observable<T>, scheduler: Scheduler): Observable<T> Errors if Observable does not emit a value in given time span, in case of which subscribes to the second Observable. |
|
public |
timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> |
|
public |
timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> |
|
public |
toArray(): Observable<any[]> | WebSocketSubject<T> | Observable<T> Collects all source emissions and emits them as an array when the source completes. |
|
public |
window(windowBoundaries: Observable<any>): Observable<Observable<T>> Branch out the source Observable values as a nested Observable whenever
|
|
public |
window(windowBoundaries: Observable<any>): Observable<Observable<T>> Branch out the source Observable values as a nested Observable whenever
|
|
public |
windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> Branch out the source Observable values as a nested Observable with each
nested Observable emitting at most |
|
public |
windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> Branch out the source Observable values as a nested Observable with each
nested Observable emitting at most |
|
public |
windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> Branch out the source Observable values as a nested Observable starting from
an emission from |
|
public |
windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> Branch out the source Observable values as a nested Observable starting from
an emission from |
|
public |
windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window. |
|
public |
windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window. |
|
public |
withLatestFrom(other: ObservableInput, project: Function): Observable Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits. |
|
public |
withLatestFrom(other: ObservableInput, project: Function): Observable Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits. |
|
public |
zip(observables: *): Observable<R> |
|
public |
zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T> |
|
public |
zipProto(observables: *): Observable<R> |
Static Public Methods
public static bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable source
Converts a callback API to a function that returns an Observable.
Give it a function f
of type f(x, callback)
and
it will return a function g
that when called as g(x)
will output an
Observable.
bindCallback
is not an operator because its input and output are not
Observables. The input is a function func
with some parameters, the
last parameter must be a callback function that func
calls when it is
done.
The output of bindCallback
is a function that takes the same parameters
as func
, except the last one (the callback). When the output function
is called with arguments it will return an Observable. If function func
calls its callback with one argument the Observable will emit that value.
If on the other hand the callback is called with multiple values the resulting
Observable will emit an array with said values as arguments.
It is very important to remember that input function func
is not called
when the output function is, but rather when the Observable returned by the output
function is subscribed. This means if func
makes an AJAX request, that request
will be made every time someone subscribes to the resulting Observable, but not before.
Optionally, a selector function can be passed to bindObservable
. The selector function
takes the same arguments as the callback and returns the value that will be emitted by the Observable.
Even though by default multiple arguments passed to callback appear in the stream as an array
the selector function will be called with arguments directly, just as the callback would.
This means you can imagine the default selector (when one is not provided explicitly)
as a function that aggregates all its arguments into an array, or simply returns first argument
if there is only one.
The last optional parameter - Scheduler - can be used to control when the call
to func
happens after someone subscribes to Observable, as well as when results
passed to callback will be emitted. By default, the subscription to an Observable calls func
synchronously, but using Scheduler.async
as the last parameter will defer the call to func
,
just like wrapping the call in setTimeout
with a timeout of 0
would. If you use the async Scheduler
and call subscribe
on the output Observable all function calls that are currently executing
will end before func
is invoked.
By default results passed to the callback are emitted immediately after func
invokes the callback.
In particular, if the callback is called synchronously the subscription of the resulting Observable
will call the next
function synchronously as well. If you want to defer that call,
you may use Scheduler.async
just as before. This means that by using Scheduler.async
you can
ensure that func
always calls its callback asynchronously, thus avoiding terrifying Zalgo.
Note that the Observable created by the output function will always emit a single value
and then complete immediately. If func
calls the callback multiple times, values from subsequent
calls will not appear in the stream. If you need to listen for multiple calls,
you probably want to use fromEvent or fromEventPattern instead.
If func
depends on some context (this
property) and is not already bound the context of func
will be the context that the output function has at call time. In particular, if func
is called as a method of some objec and if func
is not already bound, in order to preserve the context
it is recommended that the context of the output function is set to that object as well.
If the input function calls its callback in the "node style" (i.e. first argument to callback is
optional error parameter signaling whether the call failed or not), bindNodeCallback
provides convenient error handling and probably is a better choice.
bindCallback
will treat such functions the same as any other and error parameters
(whether passed or not) will always be interpreted as regular callback argument.
Params:
Name | Type | Attribute | Description |
func | function | A function with a callback as the last parameter. |
|
selector | function |
|
A function which takes the arguments from the callback and maps them to a value that is emitted on the output Observable. |
scheduler | Scheduler |
|
The scheduler on which to schedule the callbacks. |
Return:
function(...params: *): Observable | A function which returns the Observable that delivers the same values the callback would deliver. |
Example:
// Suppose we have jQuery.getJSON('/my/url', callback)
var getJSONAsObservable = Rx.Observable.bindCallback(jQuery.getJSON);
var result = getJSONAsObservable('/my/url');
result.subscribe(x => console.log(x), e => console.error(e));
someFunction((a, b, c) => {
console.log(a); // 5
console.log(b); // 'some string'
console.log(c); // {someProperty: 'someValue'}
});
const boundSomeFunction = Rx.Observable.bindCallback(someFunction);
boundSomeFunction().subscribe(values => {
console.log(values) // [5, 'some string', {someProperty: 'someValue'}]
});
someFunction((a, b, c) => {
console.log(a); // 'a'
console.log(b); // 'b'
console.log(c); // 'c'
});
const boundSomeFunction = Rx.Observable.bindCallback(someFunction, (a, b, c) => a + b + c);
boundSomeFunction().subscribe(value => {
console.log(value) // 'abc'
});
function iCallMyCallbackSynchronously(cb) {
cb();
}
const boundSyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously);
const boundAsyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously, null, Rx.Scheduler.async);
boundSyncFn().subscribe(() => console.log('I was sync!'));
boundAsyncFn().subscribe(() => console.log('I was async!'));
console.log('This happened...');
// Logs:
// I was sync!
// This happened...
// I was async!
const boundMethod = Rx.Observable.bindCallback(someObject.methodWithCallback);
boundMethod.call(someObject) // make sure methodWithCallback has access to someObject
.subscribe(subscriber);
Test:
public static bindNodeCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable source
Converts a Node.js-style callback API to a function that returns an Observable.
It's just like bindCallback, but the
callback is expected to be of type callback(error, result)
.
bindNodeCallback
is not an operator because its input and output are not
Observables. The input is a function func
with some parameters, but the
last parameter must be a callback function that func
calls when it is
done. The callback function is expected to follow Node.js conventions,
where the first argument to the callback is an error object, signaling
whether call was successful. If that object is passed to callback, it means
something went wrong.
The output of bindNodeCallback
is a function that takes the same
parameters as func
, except the last one (the callback). When the output
function is called with arguments, it will return an Observable.
If func
calls its callback with error parameter present, Observable will
error with that value as well. If error parameter is not passed, Observable will emit
second parameter. If there are more parameters (third and so on),
Observable will emit an array with all arguments, except first error argument.
Optionally bindNodeCallback
accepts selector function, which allows you to
make resulting Observable emit value computed by selector, instead of regular
callback arguments. It works similarly to bindCallback selector, but
Node.js-style error argument will never be passed to that function.
Note that func
will not be called at the same time output function is,
but rather whenever resulting Observable is subscribed. By default call to
func
will happen synchronously after subscription, but that can be changed
with proper Scheduler provided as optional third parameter. Scheduler
can also control when values from callback will be emitted by Observable.
To find out more, check out documentation for bindCallback, where
Scheduler works exactly the same.
As in bindCallback, context (this
property) of input function will be set to context
of returned function, when it is called.
After Observable emits value, it will complete immediately. This means
even if func
calls callback again, values from second and consecutive
calls will never appear on the stream. If you need to handle functions
that call callbacks multiple times, check out fromEvent or
fromEventPattern instead.
Note that bindNodeCallback
can be used in non-Node.js environments as well.
"Node.js-style" callbacks are just a convention, so if you write for
browsers or any other environment and API you use implements that callback style,
bindNodeCallback
can be safely used on that API functions as well.
Remember that Error object passed to callback does not have to be an instance
of JavaScript built-in Error
object. In fact, it does not even have to an object.
Error parameter of callback function is interpreted as "present", when value
of that parameter is truthy. It could be, for example, non-zero number, non-empty
string or boolean true
. In all of these cases resulting Observable would error
with that value. This means usually regular style callbacks will fail very often when
bindNodeCallback
is used. If your Observable errors much more often then you
would expect, check if callback really is called in Node.js-style and, if not,
switch to bindCallback instead.
Note that even if error parameter is technically present in callback, but its value is falsy, it still won't appear in array emitted by Observable or in selector function.
Params:
Name | Type | Attribute | Description |
func | function | Function with a Node.js-style callback as the last parameter. |
|
selector | function |
|
A function which takes the arguments from the callback and maps those to a value to emit on the output Observable. |
scheduler | Scheduler |
|
The scheduler on which to schedule the callbacks. |
Return:
function(...params: *): Observable | A function which returns the Observable that delivers the same values the Node.js callback would deliver. |
Example:
import * as fs from 'fs';
var readFileAsObservable = Rx.Observable.bindNodeCallback(fs.readFile);
var result = readFileAsObservable('./roadNames.txt', 'utf8');
result.subscribe(x => console.log(x), e => console.error(e));
someFunction((err, a, b) => {
console.log(err); // null
console.log(a); // 5
console.log(b); // "some string"
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
boundSomeFunction()
.subscribe(value => {
console.log(value); // [5, "some string"]
});
someFunction((err, a, b) => {
console.log(err); // undefined
console.log(a); // "abc"
console.log(b); // "DEF"
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction, (a, b) => a + b);
boundSomeFunction()
.subscribe(value => {
console.log(value); // "abcDEF"
});
someFunction(a => {
console.log(a); // 5
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
boundSomeFunction()
.subscribe(
value => {} // never gets called
err => console.log(err) // 5
);
public static combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, scheduler: Scheduler): Observable source
Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.
Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.
combineLatest
combines the values from all the Observables passed as
arguments. This is done by subscribing to each Observable in order and,
whenever any Observable emits, collecting an array of the most recent
values from each Observable. So if you pass n
Observables to operator,
returned Observable will always emit an array of n
values, in order
corresponding to order of passed Observables (value from the first Observable
on the first place and so on).
Static version of combineLatest
accepts either an array of Observables
or each Observable can be put directly as an argument. Note that array of
Observables is good choice, if you don't know beforehand how many Observables
you will combine. Passing empty array will result in Observable that
completes immediately.
To ensure output array has always the same length, combineLatest
will
actually wait for all input Observables to emit at least once,
before it starts emitting results. This means if some Observable emits
values before other Observables started emitting, all that values but last
will be lost. On the other hand, is some Observable does not emit value but
completes, resulting Observable will complete at the same moment without
emitting anything, since it will be now impossible to include value from
completed Observable in resulting array. Also, if some input Observable does
not emit any value and never completes, combineLatest
will also never emit
and never complete, since, again, it will wait for all streams to emit some
value.
If at least one Observable was passed to combineLatest
and all passed Observables
emitted something, resulting Observable will complete when all combined
streams complete. So even if some Observable completes, result of
combineLatest
will still emit values when other Observables do. In case
of completed Observable, its value from now on will always be the last
emitted value. On the other hand, if any Observable errors, combineLatest
will error immediately as well, and all other Observables will be unsubscribed.
combineLatest
accepts as optional parameter project
function, which takes
as arguments all values that would normally be emitted by resulting Observable.
project
can return any kind of value, which will be then emitted by Observable
instead of default array. Note that project
does not take as argument that array
of values, but values themselves. That means default project
can be imagined
as function that takes all its arguments and puts them into an array.
Params:
Name | Type | Attribute | Description |
observable1 | ObservableInput | An input Observable to combine with other Observables. |
|
observable2 | ObservableInput | An input Observable to combine with other Observables. More than one input Observables may be given as arguments or an array of Observables may be given as the first argument. |
|
project | function |
|
An optional function to project the values from the combined latest values into a new value on the output Observable. |
scheduler | Scheduler |
|
The IScheduler to use for subscribing to each input Observable. |
Return:
Observable | An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable. |
Example:
const firstTimer = Rx.Observable.timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
const secondTimer = Rx.Observable.timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
const combinedTimers = Rx.Observable.combineLatest(firstTimer, secondTimer);
combinedTimers.subscribe(value => console.log(value));
// Logs
// [0, 0] after 0.5s
// [1, 0] after 1s
// [1, 1] after 1.5s
// [2, 1] after 2s
const observables = [1, 5, 10].map(
n => Rx.Observable.of(n).delay(n * 1000).startWith(0) // emit 0 and then emit n after n seconds
);
const combined = Rx.Observable.combineLatest(observables);
combined.subscribe(value => console.log(value));
// Logs
// [0, 0, 0] immediately
// [1, 0, 0] after 1s
// [1, 5, 0] after 5s
// [1, 5, 10] after 10s
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = Rx.Observable.combineLatest(weight, height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));
// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222
public static concat(input1: ObservableInput, input2: ObservableInput, scheduler: Scheduler): Observable source
Creates an output Observable which sequentially emits all values from given Observable and then moves on to the next.
Concatenates multiple Observables together by sequentially emitting their values, one Observable after the other.
concat
joins multiple Observables together, by subscribing to them one at a time and
merging their results into the output Observable. You can pass either an array of
Observables, or put them directly as arguments. Passing an empty array will result
in Observable that completes immediately.
concat
will subscribe to first input Observable and emit all its values, without
changing or affecting them in any way. When that Observable completes, it will
subscribe to then next Observable passed and, again, emit its values. This will be
repeated, until the operator runs out of Observables. When last input Observable completes,
concat
will complete as well. At any given moment only one Observable passed to operator
emits values. If you would like to emit values from passed Observables concurrently, check out
merge instead, especially with optional concurrent
parameter. As a matter of fact,
concat
is an equivalent of merge
operator with concurrent
parameter set to 1
.
Note that if some input Observable never completes, concat
will also never complete
and Observables following the one that did not complete will never be subscribed. On the other
hand, if some Observable simply completes immediately after it is subscribed, it will be
invisible for concat
, which will just move on to the next Observable.
If any Observable in chain errors, instead of passing control to the next Observable,
concat
will error immediately as well. Observables that would be subscribed after
the one that emitted error, never will.
If you pass to concat
the same Observable many times, its stream of values
will be "replayed" on every subscription, which means you can repeat given Observable
as many times as you like. If passing the same Observable to concat
1000 times becomes tedious,
you can always use repeat.
Params:
Name | Type | Attribute | Description |
input1 | ObservableInput | An input Observable to concatenate with others. |
|
input2 | ObservableInput | An input Observable to concatenate with others. More than one input Observables may be given as argument. |
|
scheduler | Scheduler |
|
An optional IScheduler to schedule each Observable subscription on. |
Return:
Observable | All values of each passed Observable merged into a single Observable, in order, in serial fashion. |
Example:
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = Rx.Observable.concat(timer, sequence);
result.subscribe(x => console.log(x));
// results in:
// 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = Rx.Observable.concat([timer1, timer2, timer3]); // note that array is passed
result.subscribe(x => console.log(x));
// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9
const timer = Rx.Observable.interval(1000).take(2);
Rx.Observable.concat(timer, timer) // concating the same Observable!
.subscribe(
value => console.log(value),
err => {},
() => console.log('...and it is done!')
);
// Logs:
// 0 after 1s
// 1 after 2s
// 0 after 3s
// 1 after 4s
// "...and it is done!" also after 4s
public static create(onSubscription: function(observer: Observer): TeardownLogic): Observable source
Creates a new Observable, that will execute the specified function when an Observer subscribes to it.
Create custom Observable, that does whatever you like.
create
converts an onSubscription
function to an actual Observable.
Whenever someone subscribes to that Observable, the function will be called
with an Observer instance as a first and only parameter. onSubscription
should
then invoke the Observers next
, error
and complete
methods.
Calling next
with a value will emit that value to the observer. Calling complete
means that Observable finished emitting and will not do anything else.
Calling error
means that something went wrong - value passed to error
method should
provide details on what exactly happened.
A well-formed Observable can emit as many values as it needs via next
method,
but complete
and error
methods can be called only once and nothing else can be called
thereafter. If you try to invoke next
, complete
or error
methods after created
Observable already completed or ended with an error, these calls will be ignored to
preserve so called Observable Contract. Note that you are not required to call
complete
at any point - it is perfectly fine to create an Observable that never ends,
depending on your needs.
onSubscription
can optionally return either a function or an object with
unsubscribe
method. In both cases function or method will be called when
subscription to Observable is being cancelled and should be used to clean up all
resources. So, for example, if you are using setTimeout
in your custom
Observable, when someone unsubscribes, you can clear planned timeout, so that
it does not fire needlessly and browser (or other environment) does not waste
computing power on timing event that no one will listen to anyways.
Most of the times you should not need to use create
, because existing
operators allow you to create an Observable for most of the use cases.
That being said, create
is low-level mechanism allowing you to create
any Observable, if you have very specific needs.
TypeScript signature issue
Because Observable extends class which already has defined static create
function,
but with different type signature, it was impossible to assign proper signature to
Observable.create
. Because of that, it has very general type Function
and thus
function passed to create
will not be type checked, unless you explicitly state
what signature it should have.
When using TypeScript we recommend to declare type signature of function passed to
create
as (observer: Observer) => TeardownLogic
, where Observer
and TeardownLogic are interfaces provided by the library.
Params:
Name | Type | Attribute | Description |
onSubscription | function(observer: Observer): TeardownLogic | A
function that accepts an Observer, and invokes its |
Example:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
observable.subscribe(
value => console.log(value),
err => {},
() => console.log('this is the end')
);
// Logs
// 1
// 2
// 3
// "this is the end"
const observable = Rx.Observable.create((observer) => {
observer.error('something went really wrong...');
});
observable.subscribe(
value => console.log(value), // will never be called
err => console.log(err),
() => console.log('complete') // will never be called
);
// Logs
// "something went really wrong..."
const observable = Rx.Observable.create(observer => {
const id = setTimeout(() => observer.next('...'), 5000); // emit value after 5s
return () => { clearTimeout(id); console.log('cleared!'); };
});
const subscription = observable.subscribe(value => console.log(value));
setTimeout(() => subscription.unsubscribe(), 3000); // cancel subscription after 3s
// Logs:
// "cleared!" after 3s
// Never logs "..."
public static defer(observableFactory: function(): SubscribableOrPromise): Observable source
Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.
Creates the Observable lazily, that is, only when it is subscribed.
defer
allows you to create the Observable only when the Observer
subscribes, and create a fresh Observable for each Observer. It waits until
an Observer subscribes to it, and then it generates an Observable,
typically with an Observable factory function. It does this afresh for each
subscriber, so although each subscriber may think it is subscribing to the
same Observable, in fact each subscriber gets its own individual
Observable.
Params:
Name | Type | Attribute | Description |
observableFactory | function(): SubscribableOrPromise | The Observable factory function to invoke for each Observer that subscribes to the output Observable. May also return a Promise, which will be converted on the fly to an Observable. |
Return:
Observable | An Observable whose Observers' subscriptions trigger an invocation of the given Observable factory function. |
Example:
var clicksOrInterval = Rx.Observable.defer(function () {
if (Math.random() > 0.5) {
return Rx.Observable.fromEvent(document, 'click');
} else {
return Rx.Observable.interval(1000);
}
});
clicksOrInterval.subscribe(x => console.log(x));
// Results in the following behavior:
// If the result of Math.random() is greater than 0.5 it will listen
// for clicks anywhere on the "document"; when document is clicked it
// will log a MouseEvent object to the console. If the result is less
// than 0.5 it will emit ascending numbers, one every second(1000ms).
Test:
See:
public static empty(scheduler: Scheduler): Observable source
Creates an Observable that emits no items to the Observer and immediately emits a complete notification.
Just emits 'complete', and nothing else.
This static operator is useful for creating a simple Observable that only emits the complete notification. It can be used for composing with other Observables, such as in a mergeMap.
Params:
Name | Type | Attribute | Description |
scheduler | Scheduler |
|
A IScheduler to use for scheduling the emission of the complete notification. |
Example:
var result = Rx.Observable.empty().startWith(7);
result.subscribe(x => console.log(x));
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
x % 2 === 1 ? Rx.Observable.of('a', 'b', 'c') : Rx.Observable.empty()
);
result.subscribe(x => console.log(x));
// Results in the following to the console:
// x is equal to the count on the interval eg(0,1,2,3,...)
// x will occur every 1000ms
// if x % 2 is equal to 1 print abc
// if x % 2 is not equal to 1 nothing will be output
Test:
public static forkJoin(sources: ...SubscribableOrPromise, project: function): Observable source
Joins last values emitted by passed Observables.
Wait for Observables to complete and then combine last values they emitted.
forkJoin
is an operator that takes any number of Observables which can be passed either as an array
or directly as arguments. If no input Observables are provided, resulting stream will complete
immediately.
forkJoin
will wait for all passed Observables to complete and then it will emit an array with last
values from corresponding Observables. So if you pass n
Observables to the operator, resulting
array will have n
values, where first value is the last thing emitted by the first Observable,
second value is the last thing emitted by the second Observable and so on. That means forkJoin
will
not emit more than once and it will complete after that. If you need to emit combined values not only
at the end of lifecycle of passed Observables, but also throughout it, try out combineLatest
or zip instead.
In order for resulting array to have the same length as the number of input Observables, whenever any of
that Observables completes without emitting any value, forkJoin
will complete at that moment as well
and it will not emit anything either, even if it already has some last values from other Observables.
Conversely, if there is an Observable that never completes, forkJoin
will never complete as well,
unless at any point some other Observable completes without emitting value, which brings us back to
the previous case. Overall, in order for forkJoin
to emit a value, all Observables passed as arguments
have to emit something at least once and complete.
If any input Observable errors at some point, forkJoin
will error as well and all other Observables
will be immediately unsubscribed.
Optionally forkJoin
accepts project function, that will be called with values which normally
would land in emitted array. Whatever is returned by project function, will appear in output
Observable instead. This means that default project can be thought of as a function that takes
all its arguments and puts them into an array. Note that project function will be called only
when output Observable is supposed to emit a result.
Params:
Name | Type | Attribute | Description |
sources | ...SubscribableOrPromise | Any number of Observables provided either as an array or as an arguments passed directly to the operator. |
|
project | function |
|
Function that takes values emitted by input Observables and returns value that will appear in resulting Observable instead of default array. |
Return:
Observable | Observable emitting either an array of last values emitted by passed Observables or value from project function. |
Example:
const observable = Rx.Observable.forkJoin(
Rx.Observable.of(1, 2, 3, 4),
Rx.Observable.of(5, 6, 7, 8)
);
observable.subscribe(
value => console.log(value),
err => {},
() => console.log('This is how it ends!')
);
// Logs:
// [4, 8]
// "This is how it ends!"
const observable = Rx.Observable.forkJoin(
Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete
Rx.Observable.interval(500).take(4) // emit 0, 1, 2, 3 every half a second and complete
);
observable.subscribe(
value => console.log(value),
err => {},
() => console.log('This is how it ends!')
);
// Logs:
// [2, 3] after 3 seconds
// "This is how it ends!" immediately after
const observable = Rx.Observable.forkJoin(
Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete
Rx.Observable.interval(500).take(4), // emit 0, 1, 2, 3 every half a second and complete
(n, m) => n + m
);
observable.subscribe(
value => console.log(value),
err => {},
() => console.log('This is how it ends!')
);
// Logs:
// 5 after 3 seconds
// "This is how it ends!" immediately after
Test:
See:
public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T> source
Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.
Converts almost anything to an Observable.
Convert various other objects and data types into Observables. from
converts a Promise or an array-like or an
iterable
object into an Observable that emits the items in that promise or array or
iterable. A String, in this context, is treated as an array of characters.
Observable-like objects (contains a function named with the ES2015 Symbol
for Observable) can also be converted through this operator.
Params:
Name | Type | Attribute | Description |
ish | ObservableInput<T> | A subscribable object, a Promise, an Observable-like, an Array, an iterable or an array-like object to be converted. |
|
scheduler | Scheduler |
|
The scheduler on which to schedule the emissions of values. |
Return:
Observable<T> | The Observable whose values are originally from the input object that was converted. |
Example:
var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));
// Results in the following:
// 10 20 30
function* generateDoubles(seed) {
var i = seed;
while (true) {
yield i;
i = 2 * i; // double it
}
}
var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator).take(10);
result.subscribe(x => console.log(x));
// Results in the following:
// 3 6 12 24 48 96 192 384 768 1536
Test:
public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T> source
Creates an Observable that emits events of a specific type coming from the given event target.
Creates an Observable from DOM events, or Node.js EventEmitter events or others.
fromEvent
accepts as a first argument event target, which is an object with methods
for registering event handler functions. As a second argument it takes string that indicates
type of event we want to listen for. fromEvent
supports selected types of event targets,
which are described in detail below. If your event target does not match any of the ones listed,
you should use fromEventPattern, which can be used on arbitrary APIs.
When it comes to APIs supported by fromEvent
, their methods for adding and removing event
handler functions have different names, but they all accept a string describing event type
and function itself, which will be called whenever said event happens.
Every time resulting Observable is subscribed, event handler function will be registered to event target on given event type. When that event fires, value passed as a first argument to registered function will be emitted by output Observable. When Observable is unsubscribed, function will be unregistered from event target.
Note that if event target calls registered function with more than one argument, second
and following arguments will not appear in resulting stream. In order to get access to them,
you can pass to fromEvent
optional project function, which will be called with all arguments
passed to event handler. Output Observable will then emit value returned by project function,
instead of the usual value.
Remember that event targets listed below are checked via duck typing. It means that
no matter what kind of object you have and no matter what environment you work in,
you can safely use fromEvent
on that object if it exposes described methods (provided
of course they behave as was described above). So for example if Node.js library exposes
event target which has the same method names as DOM EventTarget, fromEvent
is still
a good choice.
If the API you use is more callback then event handler oriented (subscribed callback function fires only once and thus there is no need to manually unregister it), you should use bindCallback or bindNodeCallback instead.
fromEvent
supports following types of event targets:
DOM EventTarget
This is an object with addEventListener
and removeEventListener
methods.
In the browser, addEventListener
accepts - apart from event type string and event
handler function arguments - optional third parameter, which is either an object or boolean,
both used for additional configuration how and when passed function will be called. When
fromEvent
is used with event target of that type, you can provide this values
as third parameter as well.
Node.js EventEmitter
An object with addListener
and removeListener
methods.
JQuery-style event target
An object with on
and off
methods
DOM NodeList
List of DOM Nodes, returned for example by document.querySelectorAll
or Node.childNodes
.
Although this collection is not event target in itself, fromEvent
will iterate over all Nodes
it contains and install event handler function in every of them. When returned Observable
is unsubscribed, function will be removed from all Nodes.
DOM HtmlCollection
Just as in case of NodeList it is a collection of DOM nodes. Here as well event handler function is installed and removed in each of elements.
Params:
Name | Type | Attribute | Description |
target | EventTargetLike | The DOM EventTarget, Node.js EventEmitter, JQuery-like event target, NodeList or HTMLCollection to attach the event handler to. |
|
eventName | string | The event name of interest, being emitted by the
|
|
options | EventListenerOptions |
|
Options to pass through to addEventListener |
selector | SelectorMethodSignature<T> |
|
An optional function to post-process results. It takes the arguments from the event handler and should return a single value. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));
// Results in:
// MouseEvent object logged to console every time a click
// occurs on the document.
var clicksInDocument = Rx.Observable.fromEvent(document, 'click', true); // note optional configuration parameter
// which will be passed to addEventListener
var clicksInDiv = Rx.Observable.fromEvent(someDivInDocument, 'click');
clicksInDocument.subscribe(() => console.log('document'));
clicksInDiv.subscribe(() => console.log('div'));
// By default events bubble UP in DOM tree, so normally
// when we would click on div in document
// "div" would be logged first and then "document".
// Since we specified optional `capture` option, document
// will catch event when it goes DOWN DOM tree, so console
// will log "document" and then "div".
Test:
public static fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T> source
Creates an Observable from an API based on addHandler/removeHandler functions.
Converts any addHandler/removeHandler API to an Observable.
Creates an Observable by using the addHandler
and removeHandler
functions to add and remove the handlers, with an optional selector
function to project the event arguments to a result. The addHandler
is
called when the output Observable is subscribed, and removeHandler
is
called when the Subscription is unsubscribed.
Params:
Name | Type | Attribute | Description |
addHandler | function(handler: Function): any | A function that takes
a |
|
removeHandler | function(handler: Function, signal?: any): void |
|
An optional function that
takes a |
selector | function(...args: any): T |
|
An optional function to post-process results. It takes the arguments from the event handler and should return a single value. |
Example:
function addClickHandler(handler) {
document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
document.removeEventListener('click', handler);
}
var clicks = Rx.Observable.fromEventPattern(
addClickHandler,
removeClickHandler
);
clicks.subscribe(x => console.log(x));
public static fromPromise(promise: PromiseLike<T>, scheduler: Scheduler): Observable<T> source
Converts a Promise to an Observable.
Returns an Observable that just emits the Promise's resolved value, then completes.
Converts an ES2015 Promise or a Promises/A+ spec compliant Promise to an
Observable. If the Promise resolves with a value, the output Observable
emits that resolved value as a next
, and then completes. If the Promise
is rejected, then the output Observable emits the corresponding Error.
Params:
Name | Type | Attribute | Description |
promise | PromiseLike<T> | The promise to be converted. |
|
scheduler | Scheduler |
|
An optional IScheduler to use for scheduling the delivery of the resolved value (or the rejection). |
Example:
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));
Test:
See:
public static interval(period: number, scheduler: Scheduler): Observable source
Creates an Observable that emits sequential numbers every specified interval of time, on a specified IScheduler.
Emits incremental numbers periodically in time.
interval
returns an Observable that emits an infinite sequence of
ascending integers, with a constant interval of time of your choosing
between those emissions. The first emission is not sent immediately, but
only after the first period has passed. By default, this operator uses the
async
IScheduler to provide a notion of time, but you may pass any
IScheduler to it.
Params:
Name | Type | Attribute | Description |
period | number |
|
The interval size in milliseconds (by default) or the time unit determined by the scheduler's clock. |
scheduler | Scheduler |
|
The IScheduler to use for scheduling the emission of values, and providing a notion of "time". |
Example:
var numbers = Rx.Observable.interval(1000);
numbers.subscribe(x => console.log(x));
Test:
public static merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable source
Creates an output Observable which concurrently emits all values from every given input Observable.
Flattens multiple Observables together by blending their values into one Observable.
merge
subscribes to each given input Observable (as arguments), and simply
forwards (without doing any transformation) all the values from all the input
Observables to the output Observable. The output Observable only completes
once all input Observables have completed. Any error delivered by an input
Observable will be immediately emitted on the output Observable.
Params:
Name | Type | Attribute | Description |
observables | ...ObservableInput | Input Observables to merge together. |
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
scheduler | Scheduler |
|
The IScheduler to use for managing concurrency of input Observables. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = Rx.Observable.merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));
// Results in the following:
// timer will emit ascending values, one every second(1000ms) to console
// clicks logs MouseEvents to console everytime the "document" is clicked
// Since the two streams are merged you see these happening
// as they occur.
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));
// Results in the following:
// - First timer1 and timer2 will run concurrently
// - timer1 will emit a value every 1000ms for 10 iterations
// - timer2 will emit a value every 2000ms for 6 iterations
// - after timer1 hits it's max iteration, timer2 will
// continue, and timer3 will start to run concurrently with timer2
// - when timer2 hits it's max iteration it terminates, and
// timer3 will continue to emit a value every 500ms until it is complete
public static never(): Observable source
Creates an Observable that emits no items to the Observer.
An Observable that never emits anything.
This static operator is useful for creating a simple Observable that emits neither values nor errors nor the completion notification. It can be used for testing purposes or for composing with other Observables. Please note that by never emitting a complete notification, this Observable keeps the subscription from being disposed automatically. Subscriptions need to be manually disposed.
Example:
function info() {
console.log('Will not be called');
}
var result = Rx.Observable.never().startWith(7);
result.subscribe(x => console.log(x), info, info);
Test:
public static of(values: ...T, scheduler: Scheduler): Observable<T> source
Creates an Observable that emits some values you specify as arguments, immediately one after the other, and then emits a complete notification.
Emits the arguments you provide, then completes.
This static operator is useful for creating a simple Observable that only
emits the arguments given, and the complete notification thereafter. It can
be used for composing with other Observables, such as with concat.
By default, it uses a null
IScheduler, which means the next
notifications are sent synchronously, although with a different IScheduler
it is possible to determine when those notifications will be delivered.
Params:
Name | Type | Attribute | Description |
values | ...T | Arguments that represent |
|
scheduler | Scheduler |
|
A IScheduler to use for scheduling
the emissions of the |
Example:
var numbers = Rx.Observable.of(10, 20, 30);
var letters = Rx.Observable.of('a', 'b', 'c');
var interval = Rx.Observable.interval(1000);
var result = numbers.concat(letters).concat(interval);
result.subscribe(x => console.log(x));
Test:
public static range(start: number, count: number, scheduler: Scheduler): Observable source
Creates an Observable that emits a sequence of numbers within a specified range.
Emits a sequence of numbers in a range.
range
operator emits a range of sequential integers, in order, where you
select the start
of the range and its length
. By default, uses no
IScheduler and just delivers the notifications synchronously, but may use
an optional IScheduler to regulate those deliveries.
Example:
var numbers = Rx.Observable.range(1, 10);
numbers.subscribe(x => console.log(x));
Test:
public static throw(error: any, scheduler: Scheduler): Observable source
Creates an Observable that emits no items to the Observer and immediately emits an error notification.
Just emits 'error', and nothing else.
This static operator is useful for creating a simple Observable that only emits the error notification. It can be used for composing with other Observables, such as in a mergeMap.
Params:
Name | Type | Attribute | Description |
error | any | The particular Error to pass to the error notification. |
|
scheduler | Scheduler |
|
A IScheduler to use for scheduling the emission of the error notification. |
Return:
Observable | An error Observable: emits only the error notification using the given error argument. |
Example:
var result = Rx.Observable.throw(new Error('oops!')).startWith(7);
result.subscribe(x => console.log(x), e => console.error(e));
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
x === 13 ?
Rx.Observable.throw('Thirteens are bad') :
Rx.Observable.of('a', 'b', 'c')
);
result.subscribe(x => console.log(x), e => console.error(e));
Test:
public static timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable source
Creates an Observable that starts emitting after an initialDelay
and
emits ever increasing numbers after each period
of time thereafter.
Its like interval, but you can specify when should the emissions start.
timer
returns an Observable that emits an infinite sequence of ascending
integers, with a constant interval of time, period
of your choosing
between those emissions. The first emission happens after the specified
initialDelay
. The initial delay may be a Date. By default, this
operator uses the async
IScheduler to provide a notion of time, but you
may pass any IScheduler to it. If period
is not specified, the output
Observable emits only one value, 0
. Otherwise, it emits an infinite
sequence.
Params:
Name | Type | Attribute | Description |
initialDelay | number | Date | The initial delay time to wait before
emitting the first value of |
|
period | number |
|
The period of time between emissions of the subsequent numbers. |
scheduler | Scheduler |
|
The IScheduler to use for scheduling the emission of values, and providing a notion of "time". |
Return:
Observable | An Observable that emits a |
Example:
var numbers = Rx.Observable.timer(3000, 1000);
numbers.subscribe(x => console.log(x));
var numbers = Rx.Observable.timer(5000);
numbers.subscribe(x => console.log(x));
Test:
public static webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject source
Wrapper around the w3c-compatible WebSocket object provided by the browser.
Params:
Name | Type | Attribute | Description |
urlConfigOrSource | string | WebSocketSubjectConfig | the source of the websocket as an url or a structure defining the websocket object |
Example:
let socket$ = Observable.webSocket('ws://localhost:8081');
socket$.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
socket$.next(JSON.stringify({ op: 'hello' }));
import { w3cwebsocket } from 'websocket';
let socket$ = Observable.webSocket({
url: 'ws://localhost:8081',
WebSocketCtor: w3cwebsocket
});
socket$.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
socket$.next(JSON.stringify({ op: 'hello' }));
Test:
public static zip(observables: *): Observable<R> source
Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.
If the latest parameter is a function, this function is used to compute the created value from the input values. Otherwise, an array of the input values is returned.
Params:
Name | Type | Attribute | Description |
observables | * |
Example:
let age$ = Observable.of<number>(27, 25, 29);
let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of<boolean>(true, true, false);
Observable
.zip(age$,
name$,
isDev$,
(age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
.subscribe(x => console.log(x));
// outputs
// { age: 27, name: 'Foo', isDev: true }
// { age: 25, name: 'Bar', isDev: true }
// { age: 29, name: 'Beer', isDev: false }
Public Constructors
public constructor(subscribe: Function) source
Params:
Name | Type | Attribute | Description |
subscribe | Function | the function that is called when the Observable is
initially subscribed to. This function is given a Subscriber, to which new values
can be |
Public Methods
public [Symbol_observable](): Observable source
An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable
public audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> source
Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.
It's like auditTime, but the silencing duration is determined by a second Observable.
audit
is similar to throttle
, but emits the last value from the silenced
time window, instead of the first value. audit
emits the most recent value
from the source Observable on the output Observable as soon as its internal
timer becomes disabled, and ignores source values while the timer is enabled.
Initially, the timer is disabled. As soon as the first source value arrives,
the timer is enabled by calling the durationSelector
function with the
source value, which returns the "duration" Observable. When the duration
Observable emits a value or completes, the timer is disabled, then the most
recent source value is emitted on the output Observable, and this process
repeats for the next source value.
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | A function that receives a value from the source Observable, for computing the silencing duration, returned as an Observable or a Promise. |
Return:
Observable<T> | An Observable that performs rate-limiting of emissions from the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.audit(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> source
Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.
It's like auditTime, but the silencing duration is determined by a second Observable.
audit
is similar to throttle
, but emits the last value from the silenced
time window, instead of the first value. audit
emits the most recent value
from the source Observable on the output Observable as soon as its internal
timer becomes disabled, and ignores source values while the timer is enabled.
Initially, the timer is disabled. As soon as the first source value arrives,
the timer is enabled by calling the durationSelector
function with the
source value, which returns the "duration" Observable. When the duration
Observable emits a value or completes, the timer is disabled, then the most
recent source value is emitted on the output Observable, and this process
repeats for the next source value.
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | A function that receives a value from the source Observable, for computing the silencing duration, returned as an Observable or a Promise. |
Return:
Observable<T> | An Observable that performs rate-limiting of emissions from the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.audit(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public auditTime(duration: number, scheduler: Scheduler): Observable<T> source
Ignores source values for duration
milliseconds, then emits the most recent
value from the source Observable, then repeats this process.
When it sees a source values, it ignores that plus
the next ones for duration
milliseconds, and then it emits the most recent
value from the source.
auditTime
is similar to throttleTime
, but emits the last value from the
silenced time window, instead of the first value. auditTime
emits the most
recent value from the source Observable on the output Observable as soon as
its internal timer becomes disabled, and ignores source values while the
timer is enabled. Initially, the timer is disabled. As soon as the first
source value arrives, the timer is enabled. After duration
milliseconds (or
the time unit determined internally by the optional scheduler
) has passed,
the timer is disabled, then the most recent source value is emitted on the
output Observable, and this process repeats for the next source value.
Optionally takes a IScheduler for managing timers.
Params:
Name | Type | Attribute | Description |
duration | number | Time to wait before emitting the most recent source
value, measured in milliseconds or the time unit determined internally
by the optional |
|
scheduler | Scheduler |
|
The IScheduler to use for managing the timers that handle the rate-limiting behavior. |
Return:
Observable<T> | An Observable that performs rate-limiting of emissions from the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.auditTime(1000);
result.subscribe(x => console.log(x));
public auditTime(duration: number, scheduler: Scheduler): Observable<T> source
Ignores source values for duration
milliseconds, then emits the most recent
value from the source Observable, then repeats this process.
When it sees a source values, it ignores that plus
the next ones for duration
milliseconds, and then it emits the most recent
value from the source.
auditTime
is similar to throttleTime
, but emits the last value from the
silenced time window, instead of the first value. auditTime
emits the most
recent value from the source Observable on the output Observable as soon as
its internal timer becomes disabled, and ignores source values while the
timer is enabled. Initially, the timer is disabled. As soon as the first
source value arrives, the timer is enabled. After duration
milliseconds (or
the time unit determined internally by the optional scheduler
) has passed,
the timer is disabled, then the most recent source value is emitted on the
output Observable, and this process repeats for the next source value.
Optionally takes a IScheduler for managing timers.
Params:
Name | Type | Attribute | Description |
duration | number | Time to wait before emitting the most recent source
value, measured in milliseconds or the time unit determined internally
by the optional |
|
scheduler | Scheduler |
|
The IScheduler to use for managing the timers that handle the rate-limiting behavior. |
Return:
Observable<T> | An Observable that performs rate-limiting of emissions from the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.auditTime(1000);
result.subscribe(x => console.log(x));
public buffer(closingNotifier: Observable<any>): Observable<T[]> source
Buffers the source Observable values until closingNotifier
emits.
Collects values from the past as an array, and emits that array only when another Observable emits.
Buffers the incoming Observable values until the given closingNotifier
Observable emits a value, at which point it emits the buffer on the output
Observable and starts a new buffer internally, awaiting the next time
closingNotifier
emits.
Params:
Name | Type | Attribute | Description |
closingNotifier | Observable<any> | An Observable that signals the buffer to be emitted on the output Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var buffered = interval.buffer(clicks);
buffered.subscribe(x => console.log(x));
public buffer(closingNotifier: Observable<any>): Observable<T[]> source
Buffers the source Observable values until closingNotifier
emits.
Collects values from the past as an array, and emits that array only when another Observable emits.
Buffers the incoming Observable values until the given closingNotifier
Observable emits a value, at which point it emits the buffer on the output
Observable and starts a new buffer internally, awaiting the next time
closingNotifier
emits.
Params:
Name | Type | Attribute | Description |
closingNotifier | Observable<any> | An Observable that signals the buffer to be emitted on the output Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var buffered = interval.buffer(clicks);
buffered.subscribe(x => console.log(x));
public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> source
Buffers the source Observable values until the size hits the maximum
bufferSize
given.
Collects values from the past as an array, and emits
that array only when its size reaches bufferSize
.
Buffers a number of values from the source Observable by bufferSize
then
emits the buffer and clears it, and starts a new buffer each
startBufferEvery
values. If startBufferEvery
is not provided or is
null
, then new buffers are started immediately at the start of the source
and when each buffer closes and is emitted.
Params:
Name | Type | Attribute | Description |
bufferSize | number | The maximum size of the buffer emitted. |
|
startBufferEvery | number |
|
Interval at which to start a new buffer.
For example if |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2, 1);
buffered.subscribe(x => console.log(x));
public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> source
Buffers the source Observable values until the size hits the maximum
bufferSize
given.
Collects values from the past as an array, and emits
that array only when its size reaches bufferSize
.
Buffers a number of values from the source Observable by bufferSize
then
emits the buffer and clears it, and starts a new buffer each
startBufferEvery
values. If startBufferEvery
is not provided or is
null
, then new buffers are started immediately at the start of the source
and when each buffer closes and is emitted.
Params:
Name | Type | Attribute | Description |
bufferSize | number | The maximum size of the buffer emitted. |
|
startBufferEvery | number |
|
Interval at which to start a new buffer.
For example if |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2, 1);
buffered.subscribe(x => console.log(x));
public bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> source
Buffers the source Observable values for a specific time period.
Collects values from the past as an array, and emits those arrays periodically in time.
Buffers values from the source for a specific time duration bufferTimeSpan
.
Unless the optional argument bufferCreationInterval
is given, it emits and
resets the buffer every bufferTimeSpan
milliseconds. If
bufferCreationInterval
is given, this operator opens the buffer every
bufferCreationInterval
milliseconds and closes (emits and resets) the
buffer every bufferTimeSpan
milliseconds. When the optional argument
maxBufferSize
is specified, the buffer will be closed either after
bufferTimeSpan
milliseconds or when it contains maxBufferSize
elements.
Params:
Name | Type | Attribute | Description |
bufferTimeSpan | number | The amount of time to fill each buffer array. |
|
bufferCreationInterval | number |
|
The interval at which to start new buffers. |
maxBufferSize | number |
|
The maximum buffer size. |
scheduler | Scheduler |
|
The scheduler on which to schedule the intervals that determine buffer boundaries. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(2000, 5000);
buffered.subscribe(x => console.log(x));
See:
- buffer
- bufferCount
- bufferToggle
- bufferWhen
- windowTime
public bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> source
Buffers the source Observable values for a specific time period.
Collects values from the past as an array, and emits those arrays periodically in time.
Buffers values from the source for a specific time duration bufferTimeSpan
.
Unless the optional argument bufferCreationInterval
is given, it emits and
resets the buffer every bufferTimeSpan
milliseconds. If
bufferCreationInterval
is given, this operator opens the buffer every
bufferCreationInterval
milliseconds and closes (emits and resets) the
buffer every bufferTimeSpan
milliseconds. When the optional argument
maxBufferSize
is specified, the buffer will be closed either after
bufferTimeSpan
milliseconds or when it contains maxBufferSize
elements.
Params:
Name | Type | Attribute | Description |
bufferTimeSpan | number | The amount of time to fill each buffer array. |
|
bufferCreationInterval | number |
|
The interval at which to start new buffers. |
maxBufferSize | number |
|
The maximum buffer size. |
scheduler | Scheduler |
|
The scheduler on which to schedule the intervals that determine buffer boundaries. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(2000, 5000);
buffered.subscribe(x => console.log(x));
See:
- buffer
- bufferCount
- bufferToggle
- bufferWhen
- windowTime
public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> source
Buffers the source Observable values starting from an emission from
openings
and ending when the output of closingSelector
emits.
Collects values from the past as an array. Starts
collecting only when opening
emits, and calls the closingSelector
function to get an Observable that tells when to close the buffer.
Buffers values from the source by opening the buffer via signals from an
Observable provided to openings
, and closing and sending the buffers when
a Subscribable or Promise returned by the closingSelector
function emits.
Params:
Name | Type | Attribute | Description |
openings | SubscribableOrPromise<O> | A Subscribable or Promise of notifications to start new buffers. |
|
closingSelector | function(value: O): SubscribableOrPromise | A function that takes
the value emitted by the |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var buffered = clicks.bufferToggle(openings, i =>
i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
);
buffered.subscribe(x => console.log(x));
public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> source
Buffers the source Observable values starting from an emission from
openings
and ending when the output of closingSelector
emits.
Collects values from the past as an array. Starts
collecting only when opening
emits, and calls the closingSelector
function to get an Observable that tells when to close the buffer.
Buffers values from the source by opening the buffer via signals from an
Observable provided to openings
, and closing and sending the buffers when
a Subscribable or Promise returned by the closingSelector
function emits.
Params:
Name | Type | Attribute | Description |
openings | SubscribableOrPromise<O> | A Subscribable or Promise of notifications to start new buffers. |
|
closingSelector | function(value: O): SubscribableOrPromise | A function that takes
the value emitted by the |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var buffered = clicks.bufferToggle(openings, i =>
i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
);
buffered.subscribe(x => console.log(x));
public bufferWhen(closingSelector: function(): Observable): Observable<T[]> source
Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.
Collects values from the past as an array. When it starts collecting values, it calls a function that returns an Observable that tells when to close the buffer and restart collecting.
Opens a buffer immediately, then closes the buffer when the observable
returned by calling closingSelector
function emits a value. When it closes
the buffer, it immediately opens a new buffer and repeats the process.
Params:
Name | Type | Attribute | Description |
closingSelector | function(): Observable | A function that takes no arguments and returns an Observable that signals buffer closure. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferWhen(() =>
Rx.Observable.interval(1000 + Math.random() * 4000)
);
buffered.subscribe(x => console.log(x));
public bufferWhen(closingSelector: function(): Observable): Observable<T[]> source
Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.
Collects values from the past as an array. When it starts collecting values, it calls a function that returns an Observable that tells when to close the buffer and restart collecting.
Opens a buffer immediately, then closes the buffer when the observable
returned by calling closingSelector
function emits a value. When it closes
the buffer, it immediately opens a new buffer and repeats the process.
Params:
Name | Type | Attribute | Description |
closingSelector | function(): Observable | A function that takes no arguments and returns an Observable that signals buffer closure. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferWhen(() =>
Rx.Observable.interval(1000 + Math.random() * 4000)
);
buffered.subscribe(x => console.log(x));
public catch(selector: function): Observable source
Catches errors on the observable to be handled by returning a new observable or throwing an error.
Params:
Name | Type | Attribute | Description |
selector | function | a function that takes as arguments |
Return:
Observable | An observable that originates from either the source or the observable returned by the
catch |
Example:
Observable.of(1, 2, 3, 4, 5)
.map(n => {
if (n == 4) {
throw 'four!';
}
return n;
})
.catch(err => Observable.of('I', 'II', 'III', 'IV', 'V'))
.subscribe(x => console.log(x));
// 1, 2, 3, I, II, III, IV, V
Observable.of(1, 2, 3, 4, 5)
.map(n => {
if (n === 4) {
throw 'four!';
}
return n;
})
.catch((err, caught) => caught)
.take(30)
.subscribe(x => console.log(x));
// 1, 2, 3, 1, 2, 3, ...
Observable.of(1, 2, 3, 4, 5)
.map(n => {
if (n == 4) {
throw 'four!';
}
return n;
})
.catch(err => {
throw 'error in source. Details: ' + err;
})
.subscribe(
x => console.log(x),
err => console.log(err)
);
// 1, 2, 3, error in source. Details: four!
public combineAll(project: function): Observable source
Converts a higher-order Observable into a first-order Observable by waiting for the outer Observable to complete, then applying combineLatest.
Flattens an Observable-of-Observables by applying combineLatest when the Observable-of-Observables completes.
Takes an Observable of Observables, and collects all Observables from it. Once the outer Observable completes, it subscribes to all collected Observables and combines their values using the combineLatest strategy, such that:
- Every time an inner Observable emits, the output Observable emits.
- When the returned observable emits, it emits all of the latest values by:
- If a
project
function is provided, it is called with each recent value from each inner Observable in whatever order they arrived, and the result of theproject
function is what is emitted by the output Observable. - If there is no
project
function, an array of all of the most recent values is emitted by the output Observable.
- If a
Params:
Name | Type | Attribute | Description |
project | function |
|
An optional function to map the most recent values from each inner Observable into a new result. Takes each of the most recent values from each collected inner Observable as arguments, in order. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev =>
Rx.Observable.interval(Math.random()*2000).take(3)
).take(2);
var result = higherOrder.combineAll();
result.subscribe(x => console.log(x));
public combineLatest(other: ObservableInput, project: function): Observable source
Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.
Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.
combineLatest
combines the values from this Observable with values from
Observables passed as arguments. This is done by subscribing to each
Observable, in order, and collecting an array of each of the most recent
values any time any of the input Observables emits, then either taking that
array and passing it as arguments to an optional project
function and
emitting the return value of that, or just emitting the array of recent
values directly if there is no project
function.
Params:
Name | Type | Attribute | Description |
other | ObservableInput | An input Observable to combine with the source Observable. More than one input Observables may be given as argument. |
|
project | function |
|
An optional function to project the values from the combined latest values into a new value on the output Observable. |
Return:
Observable | An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable. |
Example:
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = weight.combineLatest(height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));
// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222
public combineLatest(other: ObservableInput, project: function): Observable source
Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables.
Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.
combineLatest
combines the values from this Observable with values from
Observables passed as arguments. This is done by subscribing to each
Observable, in order, and collecting an array of each of the most recent
values any time any of the input Observables emits, then either taking that
array and passing it as arguments to an optional project
function and
emitting the return value of that, or just emitting the array of recent
values directly if there is no project
function.
Params:
Name | Type | Attribute | Description |
other | ObservableInput | An input Observable to combine with the source Observable. More than one input Observables may be given as argument. |
|
project | function |
|
An optional function to project the values from the combined latest values into a new value on the output Observable. |
Return:
Observable | An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable. |
Example:
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = weight.combineLatest(height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));
// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222
public concat(other: ObservableInput, scheduler: Scheduler): Observable source
Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable.
Concatenates multiple Observables together by sequentially emitting their values, one Observable after the other.
Joins this Observable with multiple other Observables by subscribing to them one at a time, starting with the source, and merging their results into the output Observable. Will wait for each Observable to complete before moving on to the next.
Params:
Name | Type | Attribute | Description |
other | ObservableInput | An input Observable to concatenate after the source Observable. More than one input Observables may be given as argument. |
|
scheduler | Scheduler |
|
An optional IScheduler to schedule each Observable subscription on. |
Return:
Observable | All values of each passed Observable merged into a single Observable, in order, in serial fashion. |
Example:
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = timer.concat(sequence);
result.subscribe(x => console.log(x));
// results in:
// 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = timer1.concat(timer2, timer3);
result.subscribe(x => console.log(x));
// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9
public concat(other: ObservableInput, scheduler: Scheduler): Observable source
Creates an output Observable which sequentially emits all values from every given input Observable after the current Observable.
Concatenates multiple Observables together by sequentially emitting their values, one Observable after the other.
Joins this Observable with multiple other Observables by subscribing to them one at a time, starting with the source, and merging their results into the output Observable. Will wait for each Observable to complete before moving on to the next.
Params:
Name | Type | Attribute | Description |
other | ObservableInput | An input Observable to concatenate after the source Observable. More than one input Observables may be given as argument. |
|
scheduler | Scheduler |
|
An optional IScheduler to schedule each Observable subscription on. |
Return:
Observable | All values of each passed Observable merged into a single Observable, in order, in serial fashion. |
Example:
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = timer.concat(sequence);
result.subscribe(x => console.log(x));
// results in:
// 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = timer1.concat(timer2, timer3);
result.subscribe(x => console.log(x));
// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9
public concatAll(): Observable source
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.
Flattens an Observable-of-Observables by putting one inner Observable after the other.
Joins every Observable emitted by the source (a higher-order Observable), in a serial fashion. It subscribes to each inner Observable only after the previous inner Observable has completed, and merges all of their values into the returned observable.
Warning: If the source Observable emits Observables quickly and endlessly, and the inner Observables it emits generally complete slower than the source emits, you can run into memory issues as the incoming Observables collect in an unbounded buffer.
Note: concatAll
is equivalent to mergeAll
with concurrency parameter set
to 1
.
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(4));
var firstOrder = higherOrder.concatAll();
firstOrder.subscribe(x => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public concatAll(): Observable source
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.
Flattens an Observable-of-Observables by putting one inner Observable after the other.
Joins every Observable emitted by the source (a higher-order Observable), in a serial fashion. It subscribes to each inner Observable only after the previous inner Observable has completed, and merges all of their values into the returned observable.
Warning: If the source Observable emits Observables quickly and endlessly, and the inner Observables it emits generally complete slower than the source emits, you can run into memory issues as the incoming Observables collect in an unbounded buffer.
Note: concatAll
is equivalent to mergeAll
with concurrency parameter set
to 1
.
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(4));
var firstOrder = higherOrder.concatAll();
firstOrder.subscribe(x => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.
Maps each value to an Observable, then flattens all of these inner Observables using concatAll.
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an (so-called "inner") Observable. Each new inner Observable is concatenated with the previous inner Observable.
Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
Note: concatMap
is equivalent to mergeMap
with concurrency parameter set
to 1
.
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | A function that, when applied to an item emitted by the source Observable, returns an Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An Observable that emits the result of applying the
projection function (and the optional |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.
Maps each value to an Observable, then flattens all of these inner Observables using concatAll.
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an (so-called "inner") Observable. Each new inner Observable is concatenated with the previous inner Observable.
Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
Note: concatMap
is equivalent to mergeMap
with concurrency parameter set
to 1
.
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | A function that, when applied to an item emitted by the source Observable, returns an Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An Observable that emits the result of applying the
projection function (and the optional |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable.
It's like concatMap, but maps each value always to the same inner Observable.
Maps each source value to the given Observable innerObservable
regardless
of the source value, and then flattens those resulting Observables into one
single Observable, which is the output Observable. Each new innerObservable
instance emitted on the output Observable is concatenated with the previous
innerObservable
instance.
Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
Note: concatMapTo
is equivalent to mergeMapTo
with concurrency parameter
set to 1
.
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | An Observable to replace each value from the source Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An observable of values merged together by joining the passed observable with itself, one after the other, for each value emitted from the source. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to the same Observable which is merged multiple times in a serialized fashion on the output Observable.
It's like concatMap, but maps each value always to the same inner Observable.
Maps each source value to the given Observable innerObservable
regardless
of the source value, and then flattens those resulting Observables into one
single Observable, which is the output Observable. Each new innerObservable
instance emitted on the output Observable is concatenated with the previous
innerObservable
instance.
Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
Note: concatMapTo
is equivalent to mergeMapTo
with concurrency parameter
set to 1
.
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | An Observable to replace each value from the source Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An observable of values merged together by joining the passed observable with itself, one after the other, for each value emitted from the source. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable source
Counts the number of emissions on the source and emits that number when the source completes.
Tells how many values were emitted, when the source completes.
count
transforms an Observable that emits values into an Observable that
emits a single value that represents the number of values emitted by the
source Observable. If the source Observable terminates with an error, count
will pass this error notification along without emitting a value first. If
the source Observable does not terminate at all, count
will neither emit
a value nor terminate. This operator takes an optional predicate
function
as argument, in which case the output emission will represent the number of
source values that matched true
with the predicate
.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, i: number, source: Observable<T>): boolean |
|
A boolean function to select what values are to be counted. It is provided with arguments of:
|
Example:
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var secondsBeforeClick = seconds.takeUntil(clicks);
var result = secondsBeforeClick.count();
result.subscribe(x => console.log(x));
var numbers = Rx.Observable.range(1, 7);
var result = numbers.count(i => i % 2 === 1);
result.subscribe(x => console.log(x));
// Results in:
// 4
public count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable source
Counts the number of emissions on the source and emits that number when the source completes.
Tells how many values were emitted, when the source completes.
count
transforms an Observable that emits values into an Observable that
emits a single value that represents the number of values emitted by the
source Observable. If the source Observable terminates with an error, count
will pass this error notification along without emitting a value first. If
the source Observable does not terminate at all, count
will neither emit
a value nor terminate. This operator takes an optional predicate
function
as argument, in which case the output emission will represent the number of
source values that matched true
with the predicate
.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, i: number, source: Observable<T>): boolean |
|
A boolean function to select what values are to be counted. It is provided with arguments of:
|
Example:
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var secondsBeforeClick = seconds.takeUntil(clicks);
var result = secondsBeforeClick.count();
result.subscribe(x => console.log(x));
var numbers = Rx.Observable.range(1, 7);
var result = numbers.count(i => i % 2 === 1);
result.subscribe(x => console.log(x));
// Results in:
// 4
public debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable source
Emits a value from the source Observable only after a particular time span determined by another Observable has passed without another source emission.
It's like debounceTime, but the time span of emission silence is determined by a second Observable.
debounce
delays values emitted by the source Observable, but drops previous
pending delayed emissions if a new value arrives on the source Observable.
This operator keeps track of the most recent value from the source
Observable, and spawns a duration Observable by calling the
durationSelector
function. The value is emitted only when the duration
Observable emits a value or completes, and if no other value was emitted on
the source Observable since the duration Observable was spawned. If a new
value appears before the duration Observable emits, the previous value will
be dropped and will not be emitted on the output Observable.
Like debounceTime, this is a rate-limiting operator, and also a delay-like operator since output emissions do not necessarily occur at the same time as they did on the source Observable.
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | A function that receives a value from the source Observable, for computing the timeout duration for each source value, returned as an Observable or a Promise. |
Return:
Observable | An Observable that delays the emissions of the source
Observable by the specified duration Observable returned by
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable source
Emits a value from the source Observable only after a particular time span determined by another Observable has passed without another source emission.
It's like debounceTime, but the time span of emission silence is determined by a second Observable.
debounce
delays values emitted by the source Observable, but drops previous
pending delayed emissions if a new value arrives on the source Observable.
This operator keeps track of the most recent value from the source
Observable, and spawns a duration Observable by calling the
durationSelector
function. The value is emitted only when the duration
Observable emits a value or completes, and if no other value was emitted on
the source Observable since the duration Observable was spawned. If a new
value appears before the duration Observable emits, the previous value will
be dropped and will not be emitted on the output Observable.
Like debounceTime, this is a rate-limiting operator, and also a delay-like operator since output emissions do not necessarily occur at the same time as they did on the source Observable.
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | A function that receives a value from the source Observable, for computing the timeout duration for each source value, returned as an Observable or a Promise. |
Return:
Observable | An Observable that delays the emissions of the source
Observable by the specified duration Observable returned by
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public debounceTime(dueTime: number, scheduler: Scheduler): Observable source
Emits a value from the source Observable only after a particular time span has passed without another source emission.
It's like delay, but passes only the most recent value from each burst of emissions.
debounceTime
delays values emitted by the source Observable, but drops
previous pending delayed emissions if a new value arrives on the source
Observable. This operator keeps track of the most recent value from the
source Observable, and emits that only when dueTime
enough time has passed
without any other value appearing on the source Observable. If a new value
appears before dueTime
silence occurs, the previous value will be dropped
and will not be emitted on the output Observable.
This is a rate-limiting operator, because it is impossible for more than one
value to be emitted in any time window of duration dueTime
, but it is also
a delay-like operator since output emissions do not occur at the same time as
they did on the source Observable. Optionally takes a IScheduler for
managing timers.
Params:
Name | Type | Attribute | Description |
dueTime | number | The timeout duration in milliseconds (or the time
unit determined internally by the optional |
|
scheduler | Scheduler |
|
The IScheduler to use for managing the timers that handle the timeout for each value. |
Return:
Observable | An Observable that delays the emissions of the source
Observable by the specified |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));
public debounceTime(dueTime: number, scheduler: Scheduler): Observable source
Emits a value from the source Observable only after a particular time span has passed without another source emission.
It's like delay, but passes only the most recent value from each burst of emissions.
debounceTime
delays values emitted by the source Observable, but drops
previous pending delayed emissions if a new value arrives on the source
Observable. This operator keeps track of the most recent value from the
source Observable, and emits that only when dueTime
enough time has passed
without any other value appearing on the source Observable. If a new value
appears before dueTime
silence occurs, the previous value will be dropped
and will not be emitted on the output Observable.
This is a rate-limiting operator, because it is impossible for more than one
value to be emitted in any time window of duration dueTime
, but it is also
a delay-like operator since output emissions do not occur at the same time as
they did on the source Observable. Optionally takes a IScheduler for
managing timers.
Params:
Name | Type | Attribute | Description |
dueTime | number | The timeout duration in milliseconds (or the time
unit determined internally by the optional |
|
scheduler | Scheduler |
|
The IScheduler to use for managing the timers that handle the timeout for each value. |
Return:
Observable | An Observable that delays the emissions of the source
Observable by the specified |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));
public defaultIfEmpty(defaultValue: any): Observable source
Emits a given value if the source Observable completes without emitting any
next
value, otherwise mirrors the source Observable.
If the source Observable turns out to be empty, then this operator will emit a default value.
defaultIfEmpty
emits the values emitted by the source Observable or a
specified default value if the source Observable is empty (completes without
having emitted any next
value).
Params:
Name | Type | Attribute | Description |
defaultValue | any |
|
The default value used if the source Observable is empty. |
Return:
Observable | An Observable that emits either the specified
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
var result = clicksBeforeFive.defaultIfEmpty('no clicks');
result.subscribe(x => console.log(x));
public defaultIfEmpty(defaultValue: any): Observable source
Emits a given value if the source Observable completes without emitting any
next
value, otherwise mirrors the source Observable.
If the source Observable turns out to be empty, then this operator will emit a default value.
defaultIfEmpty
emits the values emitted by the source Observable or a
specified default value if the source Observable is empty (completes without
having emitted any next
value).
Params:
Name | Type | Attribute | Description |
defaultValue | any |
|
The default value used if the source Observable is empty. |
Return:
Observable | An Observable that emits either the specified
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
var result = clicksBeforeFive.defaultIfEmpty('no clicks');
result.subscribe(x => console.log(x));
public delay(delay: number | Date, scheduler: Scheduler): Observable source
Delays the emission of items from the source Observable by a given timeout or until a given Date.
Time shifts each item by some specified amount of milliseconds.
If the delay argument is a Number, this operator time shifts the source Observable by that amount of time expressed in milliseconds. The relative time intervals between the values are preserved.
If the delay argument is a Date, this operator time shifts the start of the Observable execution until the given date occurs.
Params:
Name | Type | Attribute | Description |
delay | number | Date | The delay duration in milliseconds (a |
|
scheduler | Scheduler |
|
The IScheduler to use for managing the timers that handle the time-shift for each item. |
Return:
Observable | An Observable that delays the emissions of the source Observable by the specified timeout or Date. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var date = new Date('March 15, 2050 12:00:00'); // in the future
var delayedClicks = clicks.delay(date); // click emitted only after that date
delayedClicks.subscribe(x => console.log(x));
public delay(delay: number | Date, scheduler: Scheduler): Observable source
Delays the emission of items from the source Observable by a given timeout or until a given Date.
Time shifts each item by some specified amount of milliseconds.
If the delay argument is a Number, this operator time shifts the source Observable by that amount of time expressed in milliseconds. The relative time intervals between the values are preserved.
If the delay argument is a Date, this operator time shifts the start of the Observable execution until the given date occurs.
Params:
Name | Type | Attribute | Description |
delay | number | Date | The delay duration in milliseconds (a |
|
scheduler | Scheduler |
|
The IScheduler to use for managing the timers that handle the time-shift for each item. |
Return:
Observable | An Observable that delays the emissions of the source Observable by the specified timeout or Date. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var date = new Date('March 15, 2050 12:00:00'); // in the future
var delayedClicks = clicks.delay(date); // click emitted only after that date
delayedClicks.subscribe(x => console.log(x));
public delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable source
Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.
It's like delay, but the time span of the delay duration is determined by a second Observable.
delayWhen
time shifts each emitted value from the source Observable by a
time span determined by another Observable. When the source emits a value,
the delayDurationSelector
function is called with the source value as
argument, and should return an Observable, called the "duration" Observable.
The source value is emitted on the output Observable only when the duration
Observable emits a value or completes.
Optionally, delayWhen
takes a second argument, subscriptionDelay
, which
is an Observable. When subscriptionDelay
emits its first value or
completes, the source Observable is subscribed to and starts behaving like
described in the previous paragraph. If subscriptionDelay
is not provided,
delayWhen
will subscribe to the source Observable as soon as the output
Observable is subscribed.
Params:
Name | Type | Attribute | Description |
delayDurationSelector | function(value: T): Observable | A function that returns an Observable for each value emitted by the source Observable, which is then used to delay the emission of that item on the output Observable until the Observable returned from this function emits a value. |
|
subscriptionDelay | Observable | An Observable that triggers the subscription to the source Observable once it emits any value. |
Return:
Observable | An Observable that delays the emissions of the source
Observable by an amount of time specified by the Observable returned by
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delayWhen(event =>
Rx.Observable.interval(Math.random() * 5000)
);
delayedClicks.subscribe(x => console.log(x));
public delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable source
Delays the emission of items from the source Observable by a given time span determined by the emissions of another Observable.
It's like delay, but the time span of the delay duration is determined by a second Observable.
delayWhen
time shifts each emitted value from the source Observable by a
time span determined by another Observable. When the source emits a value,
the delayDurationSelector
function is called with the source value as
argument, and should return an Observable, called the "duration" Observable.
The source value is emitted on the output Observable only when the duration
Observable emits a value or completes.
Optionally, delayWhen
takes a second argument, subscriptionDelay
, which
is an Observable. When subscriptionDelay
emits its first value or
completes, the source Observable is subscribed to and starts behaving like
described in the previous paragraph. If subscriptionDelay
is not provided,
delayWhen
will subscribe to the source Observable as soon as the output
Observable is subscribed.
Params:
Name | Type | Attribute | Description |
delayDurationSelector | function(value: T): Observable | A function that returns an Observable for each value emitted by the source Observable, which is then used to delay the emission of that item on the output Observable until the Observable returned from this function emits a value. |
|
subscriptionDelay | Observable | An Observable that triggers the subscription to the source Observable once it emits any value. |
Return:
Observable | An Observable that delays the emissions of the source
Observable by an amount of time specified by the Observable returned by
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delayWhen(event =>
Rx.Observable.interval(Math.random() * 5000)
);
delayedClicks.subscribe(x => console.log(x));
public dematerialize(): Observable source
Converts an Observable of Notification objects into the emissions that they represent.
Unwraps Notification objects as actual next
,
error
and complete
emissions. The opposite of materialize.
dematerialize
is assumed to operate an Observable that only emits
Notification objects as next
emissions, and does not emit any
error
. Such Observable is the output of a materialize
operation. Those
notifications are then unwrapped using the metadata they contain, and emitted
as next
, error
, and complete
on the output Observable.
Use this operator in conjunction with materialize.
Return:
Observable | An Observable that emits items and notifications embedded in Notification objects emitted by the source Observable. |
Example:
var notifA = new Rx.Notification('N', 'A');
var notifB = new Rx.Notification('N', 'B');
var notifE = new Rx.Notification('E', void 0,
new TypeError('x.toUpperCase is not a function')
);
var materialized = Rx.Observable.of(notifA, notifB, notifE);
var upperCase = materialized.dematerialize();
upperCase.subscribe(x => console.log(x), e => console.error(e));
// Results in:
// A
// B
// TypeError: x.toUpperCase is not a function
public dematerialize(): Observable source
Converts an Observable of Notification objects into the emissions that they represent.
Unwraps Notification objects as actual next
,
error
and complete
emissions. The opposite of materialize.
dematerialize
is assumed to operate an Observable that only emits
Notification objects as next
emissions, and does not emit any
error
. Such Observable is the output of a materialize
operation. Those
notifications are then unwrapped using the metadata they contain, and emitted
as next
, error
, and complete
on the output Observable.
Use this operator in conjunction with materialize.
Return:
Observable | An Observable that emits items and notifications embedded in Notification objects emitted by the source Observable. |
Example:
var notifA = new Rx.Notification('N', 'A');
var notifB = new Rx.Notification('N', 'B');
var notifE = new Rx.Notification('E', void 0,
new TypeError('x.toUpperCase is not a function')
);
var materialized = Rx.Observable.of(notifA, notifB, notifE);
var upperCase = materialized.dematerialize();
upperCase.subscribe(x => console.log(x), e => console.error(e));
// Results in:
// A
// B
// TypeError: x.toUpperCase is not a function
public distinct(keySelector: function, flushes: Observable): Observable source
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
If a keySelector function is provided, then it will project each value from the source observable into a new value that it will check for equality with previously projected values. If a keySelector function is not provided, it will use each value from the source observable directly with an equality check against previous values.
In JavaScript runtimes that support Set
, this operator will use a Set
to improve performance of the distinct value checking.
In other runtimes, this operator will use a minimal implementation of Set
that relies on an Array
and indexOf
under the
hood, so performance will degrade as more values are checked for distinction. Even in newer browsers, a long-running distinct
use might result in memory leaks. To help alleviate this in some scenarios, an optional flushes
parameter is also provided so
that the internal Set
can be "flushed", basically clearing it of values.
Params:
Name | Type | Attribute | Description |
keySelector | function |
|
Optional function to select which value you want to check as distinct. |
flushes | Observable |
|
Optional Observable for flushing the internal HashSet of the operator. |
Example:
Observable.of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
.distinct()
.subscribe(x => console.log(x)); // 1, 2, 3, 4
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'})
.distinct((p: Person) => p.name)
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
public distinct(keySelector: function, flushes: Observable): Observable source
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
If a keySelector function is provided, then it will project each value from the source observable into a new value that it will check for equality with previously projected values. If a keySelector function is not provided, it will use each value from the source observable directly with an equality check against previous values.
In JavaScript runtimes that support Set
, this operator will use a Set
to improve performance of the distinct value checking.
In other runtimes, this operator will use a minimal implementation of Set
that relies on an Array
and indexOf
under the
hood, so performance will degrade as more values are checked for distinction. Even in newer browsers, a long-running distinct
use might result in memory leaks. To help alleviate this in some scenarios, an optional flushes
parameter is also provided so
that the internal Set
can be "flushed", basically clearing it of values.
Params:
Name | Type | Attribute | Description |
keySelector | function |
|
Optional function to select which value you want to check as distinct. |
flushes | Observable |
|
Optional Observable for flushing the internal HashSet of the operator. |
Example:
Observable.of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
.distinct()
.subscribe(x => console.log(x)); // 1, 2, 3, 4
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'})
.distinct((p: Person) => p.name)
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
public distinctUntilChanged(compare: function): Observable source
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.
If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.
If a comparator function is not provided, an equality check is used by default.
Params:
Name | Type | Attribute | Description |
compare | function |
|
Optional comparison function called to test if an item is distinct from the previous item in the source. |
Example:
Observable.of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'})
{ age: 6, name: 'Foo'})
.distinctUntilChanged((p: Person, q: Person) => p.name === q.name)
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
public distinctUntilChanged(compare: function): Observable source
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.
If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.
If a comparator function is not provided, an equality check is used by default.
Params:
Name | Type | Attribute | Description |
compare | function |
|
Optional comparison function called to test if an item is distinct from the previous item in the source. |
Example:
Observable.of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'})
{ age: 6, name: 'Foo'})
.distinctUntilChanged((p: Person, q: Person) => p.name === q.name)
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
public distinctUntilKeyChanged(key: string, compare: function): Observable source
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct.
If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.
If a comparator function is not provided, an equality check is used by default.
Return:
Observable | An Observable that emits items from the source Observable with distinct values based on the key specified. |
Example:
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'},
{ age: 6, name: 'Foo'})
.distinctUntilKeyChanged('name')
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo1'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo2'},
{ age: 6, name: 'Foo3'})
.distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3))
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo1' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo2' }
public distinctUntilKeyChanged(key: string, compare: function): Observable source
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct.
If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.
If a comparator function is not provided, an equality check is used by default.
Return:
Observable | An Observable that emits items from the source Observable with distinct values based on the key specified. |
Example:
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'},
{ age: 6, name: 'Foo'})
.distinctUntilKeyChanged('name')
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo1'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo2'},
{ age: 6, name: 'Foo3'})
.distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3))
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo1' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo2' }
public do(nextOrObserver: Observer | function, error: function, complete: function): Observable source
Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source.
Intercepts each emission on the source and runs a function, but returns an output which is identical to the source as long as errors don't occur.
Returns a mirrored Observable of the source Observable, but modified so that the provided Observer is called to perform a side effect for every value, error, and completion emitted by the source. Any errors that are thrown in the aforementioned Observer or handlers are safely sent down the error path of the output Observable.
This operator is useful for debugging your Observables for the correct values or performing other side effects.
Note: this is different to a subscribe
on the Observable. If the Observable
returned by do
is not subscribed, the side effects specified by the
Observer will never happen. do
therefore simply spies on existing
execution, it does not trigger an execution to happen like subscribe
does.
Return:
Observable | An Observable identical to the source, but runs the specified Observer or callback(s) for each item. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks
.do(ev => console.log(ev))
.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));
public elementAt(index: number, defaultValue: T): Observable source
Emits the single value at the specified index
in a sequence of emissions
from the source Observable.
Emits only the i-th value, then completes.
elementAt
returns an Observable that emits the item at the specified
index
in the source Observable, or a default value if that index
is out
of range and the default
argument is provided. If the default
argument is
not given and the index
is out of range, the output Observable will emit an
ArgumentOutOfRangeError
error.
Params:
Name | Type | Attribute | Description |
index | number | Is the number |
|
defaultValue | T |
|
The default value returned for missing indices. |
Return:
Observable | An Observable that emits a single item, if it is found. Otherwise, will emit the default value if given. If not, then emits an error. |
Throw:
When using |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.elementAt(2);
result.subscribe(x => console.log(x));
// Results in:
// click 1 = nothing
// click 2 = nothing
// click 3 = MouseEvent object logged to console
public elementAt(index: number, defaultValue: T): Observable source
Emits the single value at the specified index
in a sequence of emissions
from the source Observable.
Emits only the i-th value, then completes.
elementAt
returns an Observable that emits the item at the specified
index
in the source Observable, or a default value if that index
is out
of range and the default
argument is provided. If the default
argument is
not given and the index
is out of range, the output Observable will emit an
ArgumentOutOfRangeError
error.
Params:
Name | Type | Attribute | Description |
index | number | Is the number |
|
defaultValue | T |
|
The default value returned for missing indices. |
Return:
Observable | An Observable that emits a single item, if it is found. Otherwise, will emit the default value if given. If not, then emits an error. |
Throw:
When using |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.elementAt(2);
result.subscribe(x => console.log(x));
// Results in:
// click 1 = nothing
// click 2 = nothing
// click 3 = MouseEvent object logged to console
public every(predicate: function, thisArg: any): Observable source
Returns an Observable that emits whether or not every item of the source satisfies the condition specified.
Params:
Name | Type | Attribute | Description |
predicate | function | A function for determining if an item meets a specified condition. |
|
thisArg | any |
|
Optional object to use for |
Return:
Observable | An Observable of booleans that determines if all items of the source Observable meet the condition specified. |
Example:
Observable.of(1, 2, 3, 4, 5, 6)
.every(x => x < 5)
.subscribe(x => console.log(x)); // -> false
public every(predicate: function, thisArg: any): Observable source
Returns an Observable that emits whether or not every item of the source satisfies the condition specified.
Params:
Name | Type | Attribute | Description |
predicate | function | A function for determining if an item meets a specified condition. |
|
thisArg | any |
|
Optional object to use for |
Return:
Observable | An Observable of booleans that determines if all items of the source Observable meet the condition specified. |
Example:
Observable.of(1, 2, 3, 4, 5, 6)
.every(x => x < 5)
.subscribe(x => console.log(x)); // -> false
public exhaust(): Observable source
Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.
Flattens an Observable-of-Observables by dropping the next inner Observables while the current inner is still executing.
exhaust
subscribes to an Observable that emits Observables, also known as a
higher-order Observable. Each time it observes one of these emitted inner
Observables, the output Observable begins emitting the items emitted by that
inner Observable. So far, it behaves like mergeAll. However,
exhaust
ignores every new inner Observable if the previous Observable has
not yet completed. Once that one completes, it will accept and flatten the
next inner Observable and repeat this process.
Return:
Observable | An Observable that takes a source of Observables and propagates the first observable exclusively until it completes before subscribing to the next. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(5));
var result = higherOrder.exhaust();
result.subscribe(x => console.log(x));
public exhaust(): Observable source
Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.
Flattens an Observable-of-Observables by dropping the next inner Observables while the current inner is still executing.
exhaust
subscribes to an Observable that emits Observables, also known as a
higher-order Observable. Each time it observes one of these emitted inner
Observables, the output Observable begins emitting the items emitted by that
inner Observable. So far, it behaves like mergeAll. However,
exhaust
ignores every new inner Observable if the previous Observable has
not yet completed. Once that one completes, it will accept and flatten the
next inner Observable and repeat this process.
Return:
Observable | An Observable that takes a source of Observables and propagates the first observable exclusively until it completes before subscribing to the next. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(5));
var result = higherOrder.exhaust();
result.subscribe(x => console.log(x));
public exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.
Maps each value to an Observable, then flattens all of these inner Observables using exhaust.
Returns an Observable that emits items based on applying a function that you
supply to each item emitted by the source Observable, where that function
returns an (so-called "inner") Observable. When it projects a source value to
an Observable, the output Observable begins emitting the items emitted by
that projected Observable. However, exhaustMap
ignores every new projected
Observable if the previous projected Observable has not yet completed. Once
that one completes, it will accept and flatten the next projected Observable
and repeat this process.
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | A function that, when applied to an item emitted by the source Observable, returns an Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An Observable containing projected Observables of each item of the source, ignoring projected Observables that start before their preceding Observable has completed. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000).take(5));
result.subscribe(x => console.log(x));
public exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.
Maps each value to an Observable, then flattens all of these inner Observables using exhaust.
Returns an Observable that emits items based on applying a function that you
supply to each item emitted by the source Observable, where that function
returns an (so-called "inner") Observable. When it projects a source value to
an Observable, the output Observable begins emitting the items emitted by
that projected Observable. However, exhaustMap
ignores every new projected
Observable if the previous projected Observable has not yet completed. Once
that one completes, it will accept and flatten the next projected Observable
and repeat this process.
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | A function that, when applied to an item emitted by the source Observable, returns an Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An Observable containing projected Observables of each item of the source, ignoring projected Observables that start before their preceding Observable has completed. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000).take(5));
result.subscribe(x => console.log(x));
public expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable source
Recursively projects each source value to an Observable which is merged in the output Observable.
It's similar to mergeMap, but applies the projection function to every source value as well as every output value. It's recursive.
Returns an Observable that emits items based on applying a function that you
supply to each item emitted by the source Observable, where that function
returns an Observable, and then merging those resulting Observables and
emitting the results of this merger. Expand will re-emit on the output
Observable every source value. Then, each output value is given to the
project
function which returns an inner Observable to be merged on the
output Observable. Those output values resulting from the projection are also
given to the project
function to produce new output values. This is how
expand behaves recursively.
Params:
Name | Type | Attribute | Description |
project | function(value: T, index: number) | A function that, when applied to an item emitted by the source or the output Observable, returns an Observable. |
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
scheduler | Scheduler |
|
The IScheduler to use for subscribing to each projected inner Observable. |
Return:
Observable | An Observable that emits the source values and also result of applying the projection function to each value emitted on the output Observable and and merging the results of the Observables obtained from this transformation. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var powersOfTwo = clicks
.mapTo(1)
.expand(x => Rx.Observable.of(2 * x).delay(1000))
.take(10);
powersOfTwo.subscribe(x => console.log(x));
public expand(project: function(value: T, index: number), concurrent: number, scheduler: Scheduler): Observable source
Recursively projects each source value to an Observable which is merged in the output Observable.
It's similar to mergeMap, but applies the projection function to every source value as well as every output value. It's recursive.
Returns an Observable that emits items based on applying a function that you
supply to each item emitted by the source Observable, where that function
returns an Observable, and then merging those resulting Observables and
emitting the results of this merger. Expand will re-emit on the output
Observable every source value. Then, each output value is given to the
project
function which returns an inner Observable to be merged on the
output Observable. Those output values resulting from the projection are also
given to the project
function to produce new output values. This is how
expand behaves recursively.
Params:
Name | Type | Attribute | Description |
project | function(value: T, index: number) | A function that, when applied to an item emitted by the source or the output Observable, returns an Observable. |
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
scheduler | Scheduler |
|
The IScheduler to use for subscribing to each projected inner Observable. |
Return:
Observable | An Observable that emits the source values and also result of applying the projection function to each value emitted on the output Observable and and merging the results of the Observables obtained from this transformation. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var powersOfTwo = clicks
.mapTo(1)
.expand(x => Rx.Observable.of(2 * x).delay(1000))
.take(10);
powersOfTwo.subscribe(x => console.log(x));
public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable source
Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.
Like Array.prototype.filter(), it only emits a value from the source if it passes a criterion function.
Similar to the well-known Array.prototype.filter
method, this operator
takes values from the source Observable, passes them through a predicate
function and only emits those values that yielded true
.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number): boolean | A function that
evaluates each value emitted by the source Observable. If it returns |
|
thisArg | any |
|
An optional argument to determine the value of |
Return:
Observable | An Observable of values from the source that were
allowed by the |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksOnDivs = clicks.filter(ev => ev.target.tagName === 'DIV');
clicksOnDivs.subscribe(x => console.log(x));
public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable source
Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.
Like Array.prototype.filter(), it only emits a value from the source if it passes a criterion function.
Similar to the well-known Array.prototype.filter
method, this operator
takes values from the source Observable, passes them through a predicate
function and only emits those values that yielded true
.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number): boolean | A function that
evaluates each value emitted by the source Observable. If it returns |
|
thisArg | any |
|
An optional argument to determine the value of |
Return:
Observable | An Observable of values from the source that were
allowed by the |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksOnDivs = clicks.filter(ev => ev.target.tagName === 'DIV');
clicksOnDivs.subscribe(x => console.log(x));
public finalize(callback: function): Observable source
Returns an Observable that mirrors the source Observable, but will call a specified function when the source terminates on complete or error.
Params:
Name | Type | Attribute | Description |
callback | function | Function to be called when source terminates. |
Return:
Observable | An Observable that mirrors the source, but will call the specified function on termination. |
public find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> source
Emits only the first value emitted by the source Observable that meets some condition.
Finds the first value that passes some test and emits that.
find
searches for the first item in the source Observable that matches the
specified condition embodied by the predicate
, and returns the first
occurrence in the source. Unlike first, the predicate
is required
in find
, and does not emit an error if a valid value is not found.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean | A function called with each item to test for condition matching. |
|
thisArg | any |
|
An optional argument to determine the value of |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> source
Emits only the first value emitted by the source Observable that meets some condition.
Finds the first value that passes some test and emits that.
find
searches for the first item in the source Observable that matches the
specified condition embodied by the predicate
, and returns the first
occurrence in the source. Unlike first, the predicate
is required
in find
, and does not emit an error if a valid value is not found.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean | A function called with each item to test for condition matching. |
|
thisArg | any |
|
An optional argument to determine the value of |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable source
Emits only the index of the first value emitted by the source Observable that meets some condition.
It's like find, but emits the index of the found value, not the value itself.
findIndex
searches for the first item in the source Observable that matches
the specified condition embodied by the predicate
, and returns the
(zero-based) index of the first occurrence in the source. Unlike
first, the predicate
is required in findIndex
, and does not emit
an error if a valid value is not found.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean | A function called with each item to test for condition matching. |
|
thisArg | any |
|
An optional argument to determine the value of |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.findIndex(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable source
Emits only the index of the first value emitted by the source Observable that meets some condition.
It's like find, but emits the index of the found value, not the value itself.
findIndex
searches for the first item in the source Observable that matches
the specified condition embodied by the predicate
, and returns the
(zero-based) index of the first occurrence in the source. Unlike
first, the predicate
is required in findIndex
, and does not emit
an error if a valid value is not found.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean | A function called with each item to test for condition matching. |
|
thisArg | any |
|
An optional argument to determine the value of |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.findIndex(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> source
Emits only the first value (or the first value that meets some condition) emitted by the source Observable.
Emits only the first value. Or emits only the first value that passes some test.
If called with no arguments, first
emits the first value of the source
Observable, then completes. If called with a predicate
function, first
emits the first value of the source that matches the specified condition. It
may also take a resultSelector
function to produce the output value from
the input value, and a defaultValue
to emit in case the source completes
before it is able to emit a valid value. Throws an error if defaultValue
was not provided and a matching element is not found.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean |
|
An optional function called with each item to test for condition matching. |
resultSelector | function(value: T, index: number): R |
|
A function to produce the value on the output Observable based on the values and the indices of the source Observable. The arguments passed to this function are:
|
defaultValue | R |
|
The default value emitted in case no valid value was found on the source. |
Return:
Observable<T | R> | An Observable of the first item that matches the condition. |
Throw:
Delivers an EmptyError to the Observer's |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first();
result.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> source
Emits only the first value (or the first value that meets some condition) emitted by the source Observable.
Emits only the first value. Or emits only the first value that passes some test.
If called with no arguments, first
emits the first value of the source
Observable, then completes. If called with a predicate
function, first
emits the first value of the source that matches the specified condition. It
may also take a resultSelector
function to produce the output value from
the input value, and a defaultValue
to emit in case the source completes
before it is able to emit a valid value. Throws an error if defaultValue
was not provided and a matching element is not found.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean |
|
An optional function called with each item to test for condition matching. |
resultSelector | function(value: T, index: number): R |
|
A function to produce the value on the output Observable based on the values and the indices of the source Observable. The arguments passed to this function are:
|
defaultValue | R |
|
The default value emitted in case no valid value was found on the source. |
Return:
Observable<T | R> | An Observable of the first item that matches the condition. |
Throw:
Delivers an EmptyError to the Observer's |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first();
result.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public forEach(next: Function, PromiseCtor: PromiseConstructor): Promise source
Params:
Name | Type | Attribute | Description |
next | Function | a handler for each value emitted by the observable |
|
PromiseCtor | PromiseConstructor |
|
a constructor function used to instantiate the Promise |
Return:
Promise | a promise that either resolves on observable completion or rejects with the handled error |
public groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> source
Groups the items emitted by an Observable according to a specified criterion,
and emits these grouped items as GroupedObservables
, one
GroupedObservable per group.
Params:
Name | Type | Attribute | Description |
keySelector | function(value: T): K | A function that extracts the key for each item. |
|
elementSelector | function(value: T): R |
|
A function that extracts the return element for each item. |
durationSelector | function(grouped: GroupedObservable<K, R>): Observable<any> |
|
A function that returns an Observable to determine how long each group should exist. |
Return:
Observable<GroupedObservable<K, R>> | An Observable that emits GroupedObservables, each of which corresponds to a unique key value and each of which emits those items from the source Observable that share that key value. |
Example:
Observable.of<Obj>({id: 1, name: 'aze1'},
{id: 2, name: 'sf2'},
{id: 2, name: 'dg2'},
{id: 1, name: 'erg1'},
{id: 1, name: 'df1'},
{id: 2, name: 'sfqfb2'},
{id: 3, name: 'qfs3'},
{id: 2, name: 'qsgqsfg2'}
)
.groupBy(p => p.id)
.flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], []))
.subscribe(p => console.log(p));
// displays:
// [ { id: 1, name: 'aze1' },
// { id: 1, name: 'erg1' },
// { id: 1, name: 'df1' } ]
//
// [ { id: 2, name: 'sf2' },
// { id: 2, name: 'dg2' },
// { id: 2, name: 'sfqfb2' },
// { id: 2, name: 'qsgqsfg2' } ]
//
// [ { id: 3, name: 'qfs3' } ]
Observable.of<Obj>({id: 1, name: 'aze1'},
{id: 2, name: 'sf2'},
{id: 2, name: 'dg2'},
{id: 1, name: 'erg1'},
{id: 1, name: 'df1'},
{id: 2, name: 'sfqfb2'},
{id: 3, name: 'qfs1'},
{id: 2, name: 'qsgqsfg2'}
)
.groupBy(p => p.id, p => p.name)
.flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], ["" + group$.key]))
.map(arr => ({'id': parseInt(arr[0]), 'values': arr.slice(1)}))
.subscribe(p => console.log(p));
// displays:
// { id: 1, values: [ 'aze1', 'erg1', 'df1' ] }
// { id: 2, values: [ 'sf2', 'dg2', 'sfqfb2', 'qsgqsfg2' ] }
// { id: 3, values: [ 'qfs1' ] }
public groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> source
Groups the items emitted by an Observable according to a specified criterion,
and emits these grouped items as GroupedObservables
, one
GroupedObservable per group.
Params:
Name | Type | Attribute | Description |
keySelector | function(value: T): K | A function that extracts the key for each item. |
|
elementSelector | function(value: T): R |
|
A function that extracts the return element for each item. |
durationSelector | function(grouped: GroupedObservable<K, R>): Observable<any> |
|
A function that returns an Observable to determine how long each group should exist. |
Return:
Observable<GroupedObservable<K, R>> | An Observable that emits GroupedObservables, each of which corresponds to a unique key value and each of which emits those items from the source Observable that share that key value. |
Example:
Observable.of<Obj>({id: 1, name: 'aze1'},
{id: 2, name: 'sf2'},
{id: 2, name: 'dg2'},
{id: 1, name: 'erg1'},
{id: 1, name: 'df1'},
{id: 2, name: 'sfqfb2'},
{id: 3, name: 'qfs3'},
{id: 2, name: 'qsgqsfg2'}
)
.groupBy(p => p.id)
.flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], []))
.subscribe(p => console.log(p));
// displays:
// [ { id: 1, name: 'aze1' },
// { id: 1, name: 'erg1' },
// { id: 1, name: 'df1' } ]
//
// [ { id: 2, name: 'sf2' },
// { id: 2, name: 'dg2' },
// { id: 2, name: 'sfqfb2' },
// { id: 2, name: 'qsgqsfg2' } ]
//
// [ { id: 3, name: 'qfs3' } ]
Observable.of<Obj>({id: 1, name: 'aze1'},
{id: 2, name: 'sf2'},
{id: 2, name: 'dg2'},
{id: 1, name: 'erg1'},
{id: 1, name: 'df1'},
{id: 2, name: 'sfqfb2'},
{id: 3, name: 'qfs1'},
{id: 2, name: 'qsgqsfg2'}
)
.groupBy(p => p.id, p => p.name)
.flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], ["" + group$.key]))
.map(arr => ({'id': parseInt(arr[0]), 'values': arr.slice(1)}))
.subscribe(p => console.log(p));
// displays:
// { id: 1, values: [ 'aze1', 'erg1', 'df1' ] }
// { id: 2, values: [ 'sf2', 'dg2', 'sfqfb2', 'qsgqsfg2' ] }
// { id: 3, values: [ 'qfs1' ] }
public ignoreElements(): Observable source
Ignores all items emitted by the source Observable and only passes calls of complete
or error
.
Return:
Observable | An empty Observable that only calls |
public ignoreElements(): Observable source
Ignores all items emitted by the source Observable and only passes calls of complete
or error
.
Return:
Observable | An empty Observable that only calls |
public isEmpty(): Observable source
If the source Observable is empty it returns an Observable that emits true, otherwise it emits false.
public last(predicate: function): Observable source
Returns an Observable that emits only the last item emitted by the source Observable. It optionally takes a predicate function as a parameter, in which case, rather than emitting the last item from the source Observable, the resulting Observable will emit the last item from the source Observable that satisfies the predicate.
Params:
Name | Type | Attribute | Description |
predicate | function | The condition any source emitted item has to satisfy. |
Return:
Observable | An Observable that emits only the last item satisfying the given condition from the source, or an NoSuchElementException if no such items are emitted. |
Throw:
Delivers an EmptyError to the Observer's |
|
* |
Throws if no items that match the predicate are emitted by the source Observable. |
public last(predicate: function): Observable source
Returns an Observable that emits only the last item emitted by the source Observable. It optionally takes a predicate function as a parameter, in which case, rather than emitting the last item from the source Observable, the resulting Observable will emit the last item from the source Observable that satisfies the predicate.
Params:
Name | Type | Attribute | Description |
predicate | function | The condition any source emitted item has to satisfy. |
Return:
Observable | An Observable that emits only the last item satisfying the given condition from the source, or an NoSuchElementException if no such items are emitted. |
Throw:
Delivers an EmptyError to the Observer's |
|
* |
Throws if no items that match the predicate are emitted by the source Observable. |
public letProto(func: *): Observable<R> source
Params:
Name | Type | Attribute | Description |
func | * |
public lift(operator: Operator): Observable source
Creates a new Observable, with this Observable as the source, and the passed operator defined as the new observable's operator.
Params:
Name | Type | Attribute | Description |
operator | Operator | the operator defining the operation to take on the observable |
public map(project: function(value: T, index: number): R, thisArg: any): Observable<R> source
Applies a given project
function to each value emitted by the source
Observable, and emits the resulting values as an Observable.
Like Array.prototype.map(), it passes each source value through a transformation function to get corresponding output values.
Similar to the well known Array.prototype.map
function, this operator
applies a projection to each value and emits that projection in the output
Observable.
Params:
Name | Type | Attribute | Description |
project | function(value: T, index: number): R | The function to apply
to each |
|
thisArg | any |
|
An optional argument to define what |
Return:
Observable<R> | An Observable that emits the values from the source
Observable transformed by the given |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));
Test:
public map(project: function(value: T, index: number): R, thisArg: any): Observable<R> source
Applies a given project
function to each value emitted by the source
Observable, and emits the resulting values as an Observable.
Like Array.prototype.map(), it passes each source value through a transformation function to get corresponding output values.
Similar to the well known Array.prototype.map
function, this operator
applies a projection to each value and emits that projection in the output
Observable.
Params:
Name | Type | Attribute | Description |
project | function(value: T, index: number): R | The function to apply
to each |
|
thisArg | any |
|
An optional argument to define what |
Return:
Observable<R> | An Observable that emits the values from the source
Observable transformed by the given |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));
public mapTo(value: any): Observable source
Emits the given constant value on the output Observable every time the source Observable emits a value.
Like map, but it maps every source value to the same output value every time.
Takes a constant value
as argument, and emits that whenever the source
Observable emits a value. In other words, ignores the actual source value,
and simply uses the emission moment to know when to emit the given value
.
Params:
Name | Type | Attribute | Description |
value | any | The value to map each source value to. |
Return:
Observable | An Observable that emits the given |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var greetings = clicks.mapTo('Hi');
greetings.subscribe(x => console.log(x));
See:
public mapTo(value: any): Observable source
Emits the given constant value on the output Observable every time the source Observable emits a value.
Like map, but it maps every source value to the same output value every time.
Takes a constant value
as argument, and emits that whenever the source
Observable emits a value. In other words, ignores the actual source value,
and simply uses the emission moment to know when to emit the given value
.
Params:
Name | Type | Attribute | Description |
value | any | The value to map each source value to. |
Return:
Observable | An Observable that emits the given |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var greetings = clicks.mapTo('Hi');
greetings.subscribe(x => console.log(x));
See:
public materialize(): Observable<Notification<T>> source
Represents all of the notifications from the source Observable as next
emissions marked with their original types within Notification
objects.
Wraps next
, error
and complete
emissions in
Notification objects, emitted as next
on the output Observable.
materialize
returns an Observable that emits a next
notification for each
next
, error
, or complete
emission of the source Observable. When the
source Observable emits complete
, the output Observable will emit next
as
a Notification of type "complete", and then it will emit complete
as well.
When the source Observable emits error
, the output will emit next
as a
Notification of type "error", and then complete
.
This operator is useful for producing metadata of the source Observable, to
be consumed as next
emissions. Use it in conjunction with
dematerialize.
Return:
Observable<Notification<T>> | An Observable that emits Notification objects that wrap the original emissions from the source Observable with metadata. |
Example:
var letters = Rx.Observable.of('a', 'b', 13, 'd');
var upperCase = letters.map(x => x.toUpperCase());
var materialized = upperCase.materialize();
materialized.subscribe(x => console.log(x));
// Results in the following:
// - Notification {kind: "N", value: "A", error: undefined, hasValue: true}
// - Notification {kind: "N", value: "B", error: undefined, hasValue: true}
// - Notification {kind: "E", value: undefined, error: TypeError:
// x.toUpperCase is not a function at MapSubscriber.letters.map.x
// [as project] (http://1…, hasValue: false}
public materialize(): Observable<Notification<T>> source
Represents all of the notifications from the source Observable as next
emissions marked with their original types within Notification
objects.
Wraps next
, error
and complete
emissions in
Notification objects, emitted as next
on the output Observable.
materialize
returns an Observable that emits a next
notification for each
next
, error
, or complete
emission of the source Observable. When the
source Observable emits complete
, the output Observable will emit next
as
a Notification of type "complete", and then it will emit complete
as well.
When the source Observable emits error
, the output will emit next
as a
Notification of type "error", and then complete
.
This operator is useful for producing metadata of the source Observable, to
be consumed as next
emissions. Use it in conjunction with
dematerialize.
Return:
Observable<Notification<T>> | An Observable that emits Notification objects that wrap the original emissions from the source Observable with metadata. |
Example:
var letters = Rx.Observable.of('a', 'b', 13, 'd');
var upperCase = letters.map(x => x.toUpperCase());
var materialized = upperCase.materialize();
materialized.subscribe(x => console.log(x));
// Results in the following:
// - Notification {kind: "N", value: "A", error: undefined, hasValue: true}
// - Notification {kind: "N", value: "B", error: undefined, hasValue: true}
// - Notification {kind: "E", value: undefined, error: TypeError:
// x.toUpperCase is not a function at MapSubscriber.letters.map.x
// [as project] (http://1…, hasValue: false}
public max(comparer: Function): Observable source
The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.
Params:
Name | Type | Attribute | Description |
comparer | Function |
|
Optional comparer function that it will use instead of its default to compare the value of two items. |
Example:
Rx.Observable.of(5, 4, 7, 2, 8)
.max()
.subscribe(x => console.log(x)); // -> 8
interface Person {
age: number,
name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
{age: 5, name: 'Bar'},
{age: 9, name: 'Beer'})
.max<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
.subscribe((x: Person) => console.log(x.name)); // -> 'Beer'
}
Test:
See:
public max(comparer: Function): Observable source
The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.
Params:
Name | Type | Attribute | Description |
comparer | Function |
|
Optional comparer function that it will use instead of its default to compare the value of two items. |
Example:
Rx.Observable.of(5, 4, 7, 2, 8)
.max()
.subscribe(x => console.log(x)); // -> 8
interface Person {
age: number,
name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
{age: 5, name: 'Bar'},
{age: 9, name: 'Beer'})
.max<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
.subscribe((x: Person) => console.log(x.name)); // -> 'Beer'
}
See:
public merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable source
Creates an output Observable which concurrently emits all values from every given input Observable.
Flattens multiple Observables together by blending their values into one Observable.
merge
subscribes to each given input Observable (either the source or an
Observable given as argument), and simply forwards (without doing any
transformation) all the values from all the input Observables to the output
Observable. The output Observable only completes once all input Observables
have completed. Any error delivered by an input Observable will be immediately
emitted on the output Observable.
Params:
Name | Type | Attribute | Description |
other | ObservableInput | An input Observable to merge with the source Observable. More than one input Observables may be given as argument. |
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
scheduler | Scheduler |
|
The IScheduler to use for managing concurrency of input Observables. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = clicks.merge(timer);
clicksOrTimer.subscribe(x => console.log(x));
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = timer1.merge(timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));
public merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable source
Creates an output Observable which concurrently emits all values from every given input Observable.
Flattens multiple Observables together by blending their values into one Observable.
merge
subscribes to each given input Observable (either the source or an
Observable given as argument), and simply forwards (without doing any
transformation) all the values from all the input Observables to the output
Observable. The output Observable only completes once all input Observables
have completed. Any error delivered by an input Observable will be immediately
emitted on the output Observable.
Params:
Name | Type | Attribute | Description |
other | ObservableInput | An input Observable to merge with the source Observable. More than one input Observables may be given as argument. |
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
scheduler | Scheduler |
|
The IScheduler to use for managing concurrency of input Observables. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = clicks.merge(timer);
clicksOrTimer.subscribe(x => console.log(x));
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = timer1.merge(timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));
public mergeAll(concurrent: number): Observable source
Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.
Flattens an Observable-of-Observables.
mergeAll
subscribes to an Observable that emits Observables, also known as
a higher-order Observable. Each time it observes one of these emitted inner
Observables, it subscribes to that and delivers all the values from the
inner Observable on the output Observable. The output Observable only
completes once all inner Observables have completed. Any error delivered by
a inner Observable will be immediately emitted on the output Observable.
Params:
Name | Type | Attribute | Description |
concurrent | number |
|
Maximum number of inner Observables being subscribed to concurrently. |
Return:
Observable | An Observable that emits values coming from all the inner Observables emitted by the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var firstOrder = higherOrder.mergeAll();
firstOrder.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(10));
var firstOrder = higherOrder.mergeAll(2);
firstOrder.subscribe(x => console.log(x));
public mergeAll(concurrent: number): Observable source
Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.
Flattens an Observable-of-Observables.
mergeAll
subscribes to an Observable that emits Observables, also known as
a higher-order Observable. Each time it observes one of these emitted inner
Observables, it subscribes to that and delivers all the values from the
inner Observable on the output Observable. The output Observable only
completes once all inner Observables have completed. Any error delivered by
a inner Observable will be immediately emitted on the output Observable.
Params:
Name | Type | Attribute | Description |
concurrent | number |
|
Maximum number of inner Observables being subscribed to concurrently. |
Return:
Observable | An Observable that emits values coming from all the inner Observables emitted by the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var firstOrder = higherOrder.mergeAll();
firstOrder.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(10));
var firstOrder = higherOrder.mergeAll(2);
firstOrder.subscribe(x => console.log(x));
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source
Projects each source value to an Observable which is merged in the output Observable.
Maps each value to an Observable, then flattens all of these inner Observables using mergeAll.
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | A function that, when applied to an item emitted by the source Observable, returns an Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
Return:
Observable | An Observable that emits the result of applying the
projection function (and the optional |
Example:
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));
// Results in the following:
// a0
// b0
// c0
// a1
// b1
// c1
// continues to list a,b,c with respective ascending integers
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source
Projects each source value to an Observable which is merged in the output Observable.
Maps each value to an Observable, then flattens all of these inner Observables using mergeAll.
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | A function that, when applied to an item emitted by the source Observable, returns an Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
Return:
Observable | An Observable that emits the result of applying the
projection function (and the optional |
Example:
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));
// Results in the following:
// a0
// b0
// c0
// a1
// b1
// c1
// continues to list a,b,c with respective ascending integers
public mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source
Projects each source value to the same Observable which is merged multiple times in the output Observable.
It's like mergeMap, but maps each value always to the same inner Observable.
Maps each source value to the given Observable innerObservable
regardless
of the source value, and then merges those resulting Observables into one
single Observable, which is the output Observable.
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | An Observable to replace each value from the source Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
Return:
Observable | An Observable that emits items from the given
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.mergeMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source
Projects each source value to the same Observable which is merged multiple times in the output Observable.
It's like mergeMap, but maps each value always to the same inner Observable.
Maps each source value to the given Observable innerObservable
regardless
of the source value, and then merges those resulting Observables into one
single Observable, which is the output Observable.
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | An Observable to replace each value from the source Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
Return:
Observable | An Observable that emits items from the given
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.mergeMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R> source
Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.
It's like scan, but the Observables returned by the accumulator are merged into the outer Observable.
Params:
Name | Type | Attribute | Description |
accumulator | function(acc: R, value: T): Observable<R> | The accumulator function called on each source value. |
|
seed | * | The initial accumulation value. |
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
Example:
const click$ = Rx.Observable.fromEvent(document, 'click');
const one$ = click$.mapTo(1);
const seed = 0;
const count$ = one$.mergeScan((acc, one) => Rx.Observable.of(acc + one), seed);
count$.subscribe(x => console.log(x));
// Results:
1
2
3
4
// ...and so on for each click
public mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R> source
Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.
It's like scan, but the Observables returned by the accumulator are merged into the outer Observable.
Params:
Name | Type | Attribute | Description |
accumulator | function(acc: R, value: T): Observable<R> | The accumulator function called on each source value. |
|
seed | * | The initial accumulation value. |
|
concurrent | number |
|
Maximum number of input Observables being subscribed to concurrently. |
Example:
const click$ = Rx.Observable.fromEvent(document, 'click');
const one$ = click$.mapTo(1);
const seed = 0;
const count$ = one$.mergeScan((acc, one) => Rx.Observable.of(acc + one), seed);
count$.subscribe(x => console.log(x));
// Results:
1
2
3
4
// ...and so on for each click
public min(comparer: Function): Observable<R> source
The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.
Params:
Name | Type | Attribute | Description |
comparer | Function |
|
Optional comparer function that it will use instead of its default to compare the value of two items. |
Example:
Rx.Observable.of(5, 4, 7, 2, 8)
.min()
.subscribe(x => console.log(x)); // -> 2
interface Person {
age: number,
name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
{age: 5, name: 'Bar'},
{age: 9, name: 'Beer'})
.min<Person>( (a: Person, b: Person) => a.age < b.age ? -1 : 1)
.subscribe((x: Person) => console.log(x.name)); // -> 'Bar'
}
Test:
See:
public min(comparer: Function): Observable<R> source
The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.
Params:
Name | Type | Attribute | Description |
comparer | Function |
|
Optional comparer function that it will use instead of its default to compare the value of two items. |
Example:
Rx.Observable.of(5, 4, 7, 2, 8)
.min()
.subscribe(x => console.log(x)); // -> 2
interface Person {
age: number,
name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
{age: 5, name: 'Bar'},
{age: 9, name: 'Beer'})
.min<Person>( (a: Person, b: Person) => a.age < b.age ? -1 : 1)
.subscribe((x: Person) => console.log(x.name)); // -> 'Bar'
}
See:
public multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable source
Returns an Observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable that shares a single subscription to the underlying stream.
Params:
Name | Type | Attribute | Description |
subjectOrSubjectFactory | Function | Subject | Factory function to create an intermediate subject through which the source sequence's elements will be multicast to the selector function or Subject to push source elements into. |
|
selector | Function |
|
Optional selector function that can use the multicasted source stream as many times as needed, without causing multiple subscriptions to the source stream. Subscribers to the given source will receive all notifications of the source from the time of the subscription forward. |
Return:
Observable | An Observable that emits the results of invoking the selector
on the items emitted by a |
public multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable<T> | ConnectableObservable<T> source
Allows source Observable to be subscribed only once with a Subject of choice, while still sharing its values between multiple subscribers.
Subscribe to Observable once, but send its values to multiple subscribers.
multicast
is an operator that works in two modes.
In the first mode you provide a single argument to it, which can be either an initialized Subject or a Subject
factory. As a result you will get a special kind of an Observable - a ConnectableObservable. It can be
subscribed multiple times, just as regular Observable, but it won't subscribe to the source Observable at that
moment. It will do it only if you call its connect
method. This means you can essentially control by hand, when
source Observable will be actually subscribed. What is more, ConnectableObservable will share this one subscription
between all of its subscribers. This means that, for example, ajax
Observable will only send a request once,
even though usually it would send a request per every subscriber. Since it sends a request at the moment of
subscription, here request would be sent when the connect
method of a ConnectableObservable is called.
The most common pattern of using ConnectableObservable is calling connect
when the first consumer subscribes,
keeping the subscription alive while several consumers come and go and finally unsubscribing from the source
Observable, when the last consumer unsubscribes. To not implement that logic over and over again,
ConnectableObservable has a special operator, refCount
. When called, it returns an Observable, which will count
the number of consumers subscribed to it and keep ConnectableObservable connected as long as there is at least
one consumer. So if you don't actually need to decide yourself when to connect and disconnect a
ConnectableObservable, use refCount
.
The second mode is invoked by calling multicast
with an additional, second argument - selector function.
This function accepts an Observable - which basically mirrors the source Observable - and returns Observable
as well, which should be the input stream modified by any operators you want. Note that in this
mode you cannot provide initialized Subject as a first argument - it has to be a Subject factory. If
you provide selector function, multicast
returns just a regular Observable, instead of ConnectableObservable.
Thus, as usual, each subscription to this stream triggers subscription to the source Observable. However,
if inside the selector function you subscribe to the input Observable multiple times, actual source stream
will be subscribed only once. So if you have a chain of operators that use some Observable many times,
but you want to subscribe to that Observable only once, this is the mode you would use.
Subject provided as a first parameter of multicast
is used as a proxy for the single subscription to the
source Observable. It means that all values from the source stream go through that Subject. Thus, if a Subject
has some special properties, Observable returned by multicast
will have them as well. If you want to use
multicast
with a Subject that is one of the ones included in RxJS by default - Subject,
AsyncSubject, BehaviorSubject, or ReplaySubject - simply use publish,
publishLast, publishBehavior or publishReplay respectively. These are actually
just wrappers around multicast
, with a specific Subject hardcoded inside.
Also, if you use publish or publishReplay with a ConnectableObservables refCount
operator,
you can simply use share and shareReplay respectively, which chain these two.
Params:
Name | Type | Attribute | Description |
subjectOrSubjectFactory | Function | Subject | Factory function to create an intermediate Subject through which the source sequence's elements will be multicast to the selector function input Observable or ConnectableObservable returned by the operator. |
|
selector | Function |
|
Optional selector function that can use the input stream as many times as needed, without causing multiple subscriptions to the source stream. Subscribers to the input source will receive all notifications of the source from the time of the subscription forward. |
Return:
Observable<T> | ConnectableObservable<T> | An Observable that emits the results of invoking the selector on the source stream or a special ConnectableObservable, if selector was not provided. |
Example:
const seconds = Rx.Observable.interval(1000);
const connectableSeconds = seconds.multicast(new Subject());
connectableSeconds.subscribe(value => console.log('first: ' + value));
connectableSeconds.subscribe(value => console.log('second: ' + value));
// At this point still nothing happens, even though we subscribed twice.
connectableSeconds.connect();
// From now on `seconds` are being logged to the console,
// twice per every second. `seconds` Observable was however only subscribed once,
// so under the hood Observable.interval had only one clock started.
const seconds = Rx.Observable.interval(1000);
seconds
.multicast(
() => new Subject(),
seconds => seconds.zip(seconds) // Usually zip would subscribe to `seconds` twice.
// Because we are inside selector, `seconds` is subscribed once,
) // thus starting only one clock used internally by Observable.interval.
.subscribe();
public observeOn(scheduler: IScheduler, delay: number): Observable<T> source
Re-emits all notifications from source Observable with specified scheduler.
Ensure a specific scheduler is used, from outside of an Observable.
observeOn
is an operator that accepts a scheduler as a first parameter, which will be used to reschedule
notifications emitted by the source Observable. It might be useful, if you do not have control over
internal scheduler of a given Observable, but want to control when its values are emitted nevertheless.
Returned Observable emits the same notifications (nexted values, complete and error events) as the source Observable,
but rescheduled with provided scheduler. Note that this doesn't mean that source Observables internal
scheduler will be replaced in any way. Original scheduler still will be used, but when the source Observable emits
notification, it will be immediately scheduled again - this time with scheduler passed to observeOn
.
An anti-pattern would be calling observeOn
on Observable that emits lots of values synchronously, to split
that emissions into asynchronous chunks. For this to happen, scheduler would have to be passed into the source
Observable directly (usually into the operator that creates it). observeOn
simply delays notifications a
little bit more, to ensure that they are emitted at expected moments.
As a matter of fact, observeOn
accepts second parameter, which specifies in milliseconds with what delay notifications
will be emitted. The main difference between delay operator and observeOn
is that observeOn
will delay all notifications - including error notifications - while delay
will pass through error
from source Observable immediately when it is emitted. In general it is highly recommended to use delay
operator
for any kind of delaying of values in the stream, while using observeOn
to specify which scheduler should be used
for notification emissions in general.
Params:
Name | Type | Attribute | Description |
scheduler | IScheduler | Scheduler that will be used to reschedule notifications from source Observable. |
|
delay | number |
|
Number of milliseconds that states with what delay every notification should be rescheduled. |
Return:
Observable<T> | Observable that emits the same notifications as the source Observable, but with provided scheduler. |
Example:
const intervals = Rx.Observable.interval(10); // Intervals are scheduled
// with async scheduler by default...
intervals
.observeOn(Rx.Scheduler.animationFrame) // ...but we will observe on animationFrame
.subscribe(val => { // scheduler to ensure smooth animation.
someDiv.style.height = val + 'px';
});
See:
public observeOn(scheduler: IScheduler, delay: number): Observable<T> source
Re-emits all notifications from source Observable with specified scheduler.
Ensure a specific scheduler is used, from outside of an Observable.
observeOn
is an operator that accepts a scheduler as a first parameter, which will be used to reschedule
notifications emitted by the source Observable. It might be useful, if you do not have control over
internal scheduler of a given Observable, but want to control when its values are emitted nevertheless.
Returned Observable emits the same notifications (nexted values, complete and error events) as the source Observable,
but rescheduled with provided scheduler. Note that this doesn't mean that source Observables internal
scheduler will be replaced in any way. Original scheduler still will be used, but when the source Observable emits
notification, it will be immediately scheduled again - this time with scheduler passed to observeOn
.
An anti-pattern would be calling observeOn
on Observable that emits lots of values synchronously, to split
that emissions into asynchronous chunks. For this to happen, scheduler would have to be passed into the source
Observable directly (usually into the operator that creates it). observeOn
simply delays notifications a
little bit more, to ensure that they are emitted at expected moments.
As a matter of fact, observeOn
accepts second parameter, which specifies in milliseconds with what delay notifications
will be emitted. The main difference between delay operator and observeOn
is that observeOn
will delay all notifications - including error notifications - while delay
will pass through error
from source Observable immediately when it is emitted. In general it is highly recommended to use delay
operator
for any kind of delaying of values in the stream, while using observeOn
to specify which scheduler should be used
for notification emissions in general.
Params:
Name | Type | Attribute | Description |
scheduler | IScheduler | Scheduler that will be used to reschedule notifications from source Observable. |
|
delay | number |
|
Number of milliseconds that states with what delay every notification should be rescheduled. |
Return:
Observable<T> | Observable that emits the same notifications as the source Observable, but with provided scheduler. |
Example:
const intervals = Rx.Observable.interval(10); // Intervals are scheduled
// with async scheduler by default...
intervals
.observeOn(Rx.Scheduler.animationFrame) // ...but we will observe on animationFrame
.subscribe(val => { // scheduler to ensure smooth animation.
someDiv.style.height = val + 'px';
});
See:
public onErrorResumeNext(observables: ...ObservableInput): Observable source
When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one that was passed.
Execute series of Observables no matter what, even if it means swallowing errors.
onErrorResumeNext
is an operator that accepts a series of Observables, provided either directly as
arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
as the source.
onErrorResumeNext
returns an Observable that starts by subscribing and re-emitting values from the source Observable.
When its stream of values ends - no matter if Observable completed or emitted an error - onErrorResumeNext
will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
its values as well and - again - when that stream ends, onErrorResumeNext
will proceed to subscribing yet another
Observable in provided series, no matter if previous Observable completed or ended with an error. This will
be happening until there is no more Observables left in the series, at which point returned Observable will
complete - even if the last subscribed stream ended with an error.
onErrorResumeNext
can be therefore thought of as version of concat operator, which is more permissive
when it comes to the errors emitted by its input Observables. While concat
subscribes to the next Observable
in series only if previous one successfully completed, onErrorResumeNext
subscribes even if it ended with
an error.
Note that you do not get any access to errors emitted by the Observables. In particular do not expect these errors to appear in error callback passed to subscribe. If you want to take specific actions based on what error was emitted by an Observable, you should try out catch instead.
Params:
Name | Type | Attribute | Description |
observables | ...ObservableInput | Observables passed either directly or as an array. |
Return:
Observable | An Observable that emits values from source Observable, but - if it errors - subscribes to the next passed Observable and so on, until it completes or runs out of Observables. |
Example:
Rx.Observable.of(1, 2, 3, 0)
.map(x => {
if (x === 0) { throw Error(); }
return 10 / x;
})
.onErrorResumeNext(Rx.Observable.of(1, 2, 3))
.subscribe(
val => console.log(val),
err => console.log(err), // Will never be called.
() => console.log('that\'s it!')
);
// Logs:
// 10
// 5
// 3.3333333333333335
// 1
// 2
// 3
// "that's it!"
public onErrorResumeNext(observables: ...ObservableInput): Observable source
When any of the provided Observable emits an complete or error notification, it immediately subscribes to the next one that was passed.
Execute series of Observables no matter what, even if it means swallowing errors.
onErrorResumeNext
is an operator that accepts a series of Observables, provided either directly as
arguments or as an array. If no single Observable is provided, returned Observable will simply behave the same
as the source.
onErrorResumeNext
returns an Observable that starts by subscribing and re-emitting values from the source Observable.
When its stream of values ends - no matter if Observable completed or emitted an error - onErrorResumeNext
will subscribe to the first Observable that was passed as an argument to the method. It will start re-emitting
its values as well and - again - when that stream ends, onErrorResumeNext
will proceed to subscribing yet another
Observable in provided series, no matter if previous Observable completed or ended with an error. This will
be happening until there is no more Observables left in the series, at which point returned Observable will
complete - even if the last subscribed stream ended with an error.
onErrorResumeNext
can be therefore thought of as version of concat operator, which is more permissive
when it comes to the errors emitted by its input Observables. While concat
subscribes to the next Observable
in series only if previous one successfully completed, onErrorResumeNext
subscribes even if it ended with
an error.
Note that you do not get any access to errors emitted by the Observables. In particular do not expect these errors to appear in error callback passed to subscribe. If you want to take specific actions based on what error was emitted by an Observable, you should try out catch instead.
Params:
Name | Type | Attribute | Description |
observables | ...ObservableInput | Observables passed either directly or as an array. |
Return:
Observable | An Observable that emits values from source Observable, but - if it errors - subscribes to the next passed Observable and so on, until it completes or runs out of Observables. |
Example:
Rx.Observable.of(1, 2, 3, 0)
.map(x => {
if (x === 0) { throw Error(); }
return 10 / x;
})
.onErrorResumeNext(Rx.Observable.of(1, 2, 3))
.subscribe(
val => console.log(val),
err => console.log(err), // Will never be called.
() => console.log('that\'s it!')
);
// Logs:
// 10
// 5
// 3.3333333333333335
// 1
// 2
// 3
// "that's it!"
public pairwise(): Observable<Array<T>> source
Groups pairs of consecutive emissions together and emits them as an array of two values.
Puts the current value and previous value together as an array, and emits that.
The Nth emission from the source Observable will cause the output Observable
to emit an array [(N-1)th, Nth] of the previous and the current value, as a
pair. For this reason, pairwise
emits on the second and subsequent
emissions from the source Observable, but not on the first emission, because
there is no previous value in that case.
Return:
Observable<Array<T>> | An Observable of pairs (as arrays) of consecutive values from the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var pairs = clicks.pairwise();
var distance = pairs.map(pair => {
var x0 = pair[0].clientX;
var y0 = pair[0].clientY;
var x1 = pair[1].clientX;
var y1 = pair[1].clientY;
return Math.sqrt(Math.pow(x0 - x1, 2) + Math.pow(y0 - y1, 2));
});
distance.subscribe(x => console.log(x));
See:
public pairwise(): Observable<Array<T>> source
Groups pairs of consecutive emissions together and emits them as an array of two values.
Puts the current value and previous value together as an array, and emits that.
The Nth emission from the source Observable will cause the output Observable
to emit an array [(N-1)th, Nth] of the previous and the current value, as a
pair. For this reason, pairwise
emits on the second and subsequent
emissions from the source Observable, but not on the first emission, because
there is no previous value in that case.
Return:
Observable<Array<T>> | An Observable of pairs (as arrays) of consecutive values from the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var pairs = clicks.pairwise();
var distance = pairs.map(pair => {
var x0 = pair[0].clientX;
var y0 = pair[0].clientY;
var x1 = pair[1].clientX;
var y1 = pair[1].clientY;
return Math.sqrt(Math.pow(x0 - x1, 2) + Math.pow(y0 - y1, 2));
});
distance.subscribe(x => console.log(x));
See:
public partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>] source
Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate.
It's like filter, but returns two Observables: one like the output of filter, and the other with values that did not pass the condition.
partition
outputs an array with two Observables that partition the values
from the source Observable through the given predicate
function. The first
Observable in that array emits source values for which the predicate argument
returns true. The second Observable emits source values for which the
predicate returns false. The first behaves like filter and the second
behaves like filter with the predicate negated.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number): boolean | A function that
evaluates each value emitted by the source Observable. If it returns |
|
thisArg | any |
|
An optional argument to determine the value of |
Return:
[Observable<T>, Observable<T>] | An array with two Observables: one with values that passed the predicate, and another with values that did not pass the predicate. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var parts = clicks.partition(ev => ev.target.tagName === 'DIV');
var clicksOnDivs = parts[0];
var clicksElsewhere = parts[1];
clicksOnDivs.subscribe(x => console.log('DIV clicked: ', x));
clicksElsewhere.subscribe(x => console.log('Other clicked: ', x));
See:
public partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>] source
Splits the source Observable into two, one with values that satisfy a predicate, and another with values that don't satisfy the predicate.
It's like filter, but returns two Observables: one like the output of filter, and the other with values that did not pass the condition.
partition
outputs an array with two Observables that partition the values
from the source Observable through the given predicate
function. The first
Observable in that array emits source values for which the predicate argument
returns true. The second Observable emits source values for which the
predicate returns false. The first behaves like filter and the second
behaves like filter with the predicate negated.
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number): boolean | A function that
evaluates each value emitted by the source Observable. If it returns |
|
thisArg | any |
|
An optional argument to determine the value of |
Return:
[Observable<T>, Observable<T>] | An array with two Observables: one with values that passed the predicate, and another with values that did not pass the predicate. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var parts = clicks.partition(ev => ev.target.tagName === 'DIV');
var clicksOnDivs = parts[0];
var clicksElsewhere = parts[1];
clicksOnDivs.subscribe(x => console.log('DIV clicked: ', x));
clicksElsewhere.subscribe(x => console.log('Other clicked: ', x));
See:
public pipe(operations: ...*): Observable source
Used to stitch together functional operators into a chain.
Params:
Name | Type | Attribute | Description |
operations | ...* |
Return:
Observable | the Observable result of all of the operators having been called in the order they were passed in. |
Example:
import { map, filter, scan } from 'rxjs/operators';
Rx.Observable.interval(1000)
.pipe(
filter(x => x % 2 === 0),
map(x => x + x),
scan((acc, x) => acc + x)
)
.subscribe(x => console.log(x))
public pluck(properties: ...string): Observable source
Maps each source value (an object) to its specified nested property.
Like map, but meant only for picking one of the nested properties of every emitted object.
Given a list of strings describing a path to an object property, retrieves
the value of a specified nested property from all values in the source
Observable. If a property can't be resolved, it will return undefined
for
that value.
Params:
Name | Type | Attribute | Description |
properties | ...string | The nested properties to pluck from each source value (an object). |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var tagNames = clicks.pluck('target', 'tagName');
tagNames.subscribe(x => console.log(x));
See:
public pluck(properties: ...string): Observable source
Maps each source value (an object) to its specified nested property.
Like map, but meant only for picking one of the nested properties of every emitted object.
Given a list of strings describing a path to an object property, retrieves
the value of a specified nested property from all values in the source
Observable. If a property can't be resolved, it will return undefined
for
that value.
Params:
Name | Type | Attribute | Description |
properties | ...string | The nested properties to pluck from each source value (an object). |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var tagNames = clicks.pluck('target', 'tagName');
tagNames.subscribe(x => console.log(x));
See:
public publish(selector: Function): * source
Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it.
Params:
Name | Type | Attribute | Description |
selector | Function |
|
Optional selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all notifications of the source from the time of the subscription on. |
Return:
* | A ConnectableObservable that upon connection causes the source Observable to emit items to its Observers. |
public publish(selector: Function): * source
Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called before it begins emitting items to those Observers that have subscribed to it.
Params:
Name | Type | Attribute | Description |
selector | Function |
|
Optional selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all notifications of the source from the time of the subscription on. |
Return:
* | A ConnectableObservable that upon connection causes the source Observable to emit items to its Observers. |
public publishBehavior(value: *): ConnectableObservable<T> source
Params:
Name | Type | Attribute | Description |
value | * |
public publishBehavior(value: *): ConnectableObservable<T> source
Params:
Name | Type | Attribute | Description |
value | * |
public publishLast(): ConnectableObservable<T> source
public publishReplay(bufferSize: *, windowTime: *, selectorOrScheduler: *, scheduler: *): Observable<T> | ConnectableObservable<T> source
Params:
Name | Type | Attribute | Description |
bufferSize | * | ||
windowTime | * | ||
selectorOrScheduler | * | ||
scheduler | * |
Return:
Observable<T> | ConnectableObservable<T> |
public race(): Observable source
Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.
Params:
Name | Type | Attribute | Description |
...observables | ...Observables | Sources used to race for which Observable emits first. |
public race(): Observable source
Returns an Observable that mirrors the first source Observable to emit an item from the combination of this Observable and supplied Observables.
Params:
Name | Type | Attribute | Description |
...observables | ...Observables | Sources used to race for which Observable emits first. |
public reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R> source
Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.
Combines together all values emitted on the source, using an accumulator function that knows how to join a new source value into the accumulation from the past.
Like
Array.prototype.reduce(),
reduce
applies an accumulator
function against an accumulation and each
value of the source Observable (from the past) to reduce it to a single
value, emitted on the output Observable. Note that reduce
will only emit
one value, only when the source Observable completes. It is equivalent to
applying operator scan followed by operator last.
Returns an Observable that applies a specified accumulator
function to each
item emitted by the source Observable. If a seed
value is specified, then
that value will be used as the initial value for the accumulator. If no seed
value is specified, the first item of the source is used as the seed.
Return:
Observable<R> | An Observable that emits a single value that is the result of accumulating the values emitted by the source Observable. |
Example:
var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click')
.takeUntil(Rx.Observable.interval(5000));
var ones = clicksInFiveSeconds.mapTo(1);
var seed = 0;
var count = ones.reduce((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));
public reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R> source
Applies an accumulator function over the source Observable, and returns the accumulated result when the source completes, given an optional seed value.
Combines together all values emitted on the source, using an accumulator function that knows how to join a new source value into the accumulation from the past.
Like
Array.prototype.reduce(),
reduce
applies an accumulator
function against an accumulation and each
value of the source Observable (from the past) to reduce it to a single
value, emitted on the output Observable. Note that reduce
will only emit
one value, only when the source Observable completes. It is equivalent to
applying operator scan followed by operator last.
Returns an Observable that applies a specified accumulator
function to each
item emitted by the source Observable. If a seed
value is specified, then
that value will be used as the initial value for the accumulator. If no seed
value is specified, the first item of the source is used as the seed.
Return:
Observable<R> | An Observable that emits a single value that is the result of accumulating the values emitted by the source Observable. |
Example:
var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click')
.takeUntil(Rx.Observable.interval(5000));
var ones = clicksInFiveSeconds.mapTo(1);
var seed = 0;
var count = ones.reduce((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));
public repeat(count: number): Observable source
Returns an Observable that repeats the stream of items emitted by the source Observable at most count times.
Params:
Name | Type | Attribute | Description |
count | number |
|
The number of times the source Observable items are repeated, a count of 0 will yield an empty Observable. |
Return:
Observable | An Observable that repeats the stream of items emitted by the source Observable at most count times. |
public repeat(count: number): Observable source
Returns an Observable that repeats the stream of items emitted by the source Observable at most count times.
Params:
Name | Type | Attribute | Description |
count | number |
|
The number of times the source Observable items are repeated, a count of 0 will yield an empty Observable. |
Return:
Observable | An Observable that repeats the stream of items emitted by the source Observable at most count times. |
public repeatWhen(notifier: function(notifications: Observable): Observable): Observable source
Returns an Observable that mirrors the source Observable with the exception of a complete
. If the source
Observable calls complete
, this method will emit to the Observable returned from notifier
. If that Observable
calls complete
or error
, then this method will call complete
or error
on the child subscription. Otherwise
this method will resubscribe to the source Observable.
Params:
Name | Type | Attribute | Description |
notifier | function(notifications: Observable): Observable | Receives an Observable of notifications with
which a user can |
public repeatWhen(notifier: function(notifications: Observable): Observable): Observable source
Returns an Observable that mirrors the source Observable with the exception of a complete
. If the source
Observable calls complete
, this method will emit to the Observable returned from notifier
. If that Observable
calls complete
or error
, then this method will call complete
or error
on the child subscription. Otherwise
this method will resubscribe to the source Observable.
Params:
Name | Type | Attribute | Description |
notifier | function(notifications: Observable): Observable | Receives an Observable of notifications with
which a user can |
public retry(count: number): Observable source
Returns an Observable that mirrors the source Observable with the exception of an error
. If the source Observable
calls error
, this method will resubscribe to the source Observable for a maximum of count
resubscriptions (given
as a number parameter) rather than propagating the error
call.
Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted
during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second
time and emits: [1, 2, 3, 4, 5] then the complete stream of emissions and notifications
would be: [1, 2, 1, 2, 3, 4, 5, complete
].
Params:
Name | Type | Attribute | Description |
count | number | Number of retry attempts before failing. |
public retry(count: number): Observable source
Returns an Observable that mirrors the source Observable with the exception of an error
. If the source Observable
calls error
, this method will resubscribe to the source Observable for a maximum of count
resubscriptions (given
as a number parameter) rather than propagating the error
call.
Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted
during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second
time and emits: [1, 2, 3, 4, 5] then the complete stream of emissions and notifications
would be: [1, 2, 1, 2, 3, 4, 5, complete
].
Params:
Name | Type | Attribute | Description |
count | number | Number of retry attempts before failing. |
public retryWhen(notifier: function(errors: Observable): Observable): Observable source
Returns an Observable that mirrors the source Observable with the exception of an error
. If the source Observable
calls error
, this method will emit the Throwable that caused the error to the Observable returned from notifier
.
If that Observable calls complete
or error
then this method will call complete
or error
on the child
subscription. Otherwise this method will resubscribe to the source Observable.
Params:
Name | Type | Attribute | Description |
notifier | function(errors: Observable): Observable | Receives an Observable of notifications with which a
user can |
public retryWhen(notifier: function(errors: Observable): Observable): Observable source
Returns an Observable that mirrors the source Observable with the exception of an error
. If the source Observable
calls error
, this method will emit the Throwable that caused the error to the Observable returned from notifier
.
If that Observable calls complete
or error
then this method will call complete
or error
on the child
subscription. Otherwise this method will resubscribe to the source Observable.
Params:
Name | Type | Attribute | Description |
notifier | function(errors: Observable): Observable | Receives an Observable of notifications with which a
user can |
public sample(notifier: Observable<any>): Observable<T> source
Emits the most recently emitted value from the source Observable whenever
another Observable, the notifier
, emits.
It's like sampleTime, but samples whenever
the notifier
Observable emits something.
Whenever the notifier
Observable emits a value or completes, sample
looks at the source Observable and emits whichever value it has most recently
emitted since the previous sampling, unless the source has not emitted
anything since the previous sampling. The notifier
is subscribed to as soon
as the output Observable is subscribed.
Params:
Name | Type | Attribute | Description |
notifier | Observable<any> | The Observable to use for sampling the source Observable. |
Return:
Observable<T> | An Observable that emits the results of sampling the values emitted by the source Observable whenever the notifier Observable emits value or completes. |
Example:
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = seconds.sample(clicks);
result.subscribe(x => console.log(x));
public sample(notifier: Observable<any>): Observable<T> source
Emits the most recently emitted value from the source Observable whenever
another Observable, the notifier
, emits.
It's like sampleTime, but samples whenever
the notifier
Observable emits something.
Whenever the notifier
Observable emits a value or completes, sample
looks at the source Observable and emits whichever value it has most recently
emitted since the previous sampling, unless the source has not emitted
anything since the previous sampling. The notifier
is subscribed to as soon
as the output Observable is subscribed.
Params:
Name | Type | Attribute | Description |
notifier | Observable<any> | The Observable to use for sampling the source Observable. |
Return:
Observable<T> | An Observable that emits the results of sampling the values emitted by the source Observable whenever the notifier Observable emits value or completes. |
Example:
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = seconds.sample(clicks);
result.subscribe(x => console.log(x));
public sampleTime(period: number, scheduler: Scheduler): Observable<T> source
Emits the most recently emitted value from the source Observable within periodic time intervals.
Samples the source Observable at periodic time intervals, emitting what it samples.
sampleTime
periodically looks at the source Observable and emits whichever
value it has most recently emitted since the previous sampling, unless the
source has not emitted anything since the previous sampling. The sampling
happens periodically in time every period
milliseconds (or the time unit
defined by the optional scheduler
argument). The sampling starts as soon as
the output Observable is subscribed.
Return:
Observable<T> | An Observable that emits the results of sampling the values emitted by the source Observable at the specified time interval. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.sampleTime(1000);
result.subscribe(x => console.log(x));
public sampleTime(period: number, scheduler: Scheduler): Observable<T> source
Emits the most recently emitted value from the source Observable within periodic time intervals.
Samples the source Observable at periodic time intervals, emitting what it samples.
sampleTime
periodically looks at the source Observable and emits whichever
value it has most recently emitted since the previous sampling, unless the
source has not emitted anything since the previous sampling. The sampling
happens periodically in time every period
milliseconds (or the time unit
defined by the optional scheduler
argument). The sampling starts as soon as
the output Observable is subscribed.
Return:
Observable<T> | An Observable that emits the results of sampling the values emitted by the source Observable at the specified time interval. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.sampleTime(1000);
result.subscribe(x => console.log(x));
public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> source
Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.
It's like reduce, but emits the current accumulation whenever the source emits a value.
Combines together all values emitted on the source, using an accumulator function that knows how to join a new source value into the accumulation from the past. Is similar to reduce, but emits the intermediate accumulations.
Returns an Observable that applies a specified accumulator
function to each
item emitted by the source Observable. If a seed
value is specified, then
that value will be used as the initial value for the accumulator. If no seed
value is specified, the first item of the source is used as the seed.
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));
public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> source
Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.
It's like reduce, but emits the current accumulation whenever the source emits a value.
Combines together all values emitted on the source, using an accumulator function that knows how to join a new source value into the accumulation from the past. Is similar to reduce, but emits the intermediate accumulations.
Returns an Observable that applies a specified accumulator
function to each
item emitted by the source Observable. If a seed
value is specified, then
that value will be used as the initial value for the accumulator. If no seed
value is specified, the first item of the source is used as the seed.
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));
public sequenceEqual(compareTo: Observable, comparor: function): Observable source
Compares all values of two observables in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal.
Checks to see of all values emitted by both observables are equal, in order.
sequenceEqual
subscribes to two observables and buffers incoming values from each observable. Whenever either
observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom
up; If any value pair doesn't match, the returned observable will emit false
and complete. If one of the
observables completes, the operator will wait for the other observable to complete; If the other
observable emits before completing, the returned observable will emit false
and complete. If one observable never
completes or emits after the other complets, the returned observable will never complete.
Params:
Name | Type | Attribute | Description |
compareTo | Observable | The observable sequence to compare the source sequence to. |
|
comparor | function |
|
An optional function to compare each value pair |
Return:
Observable | An Observable of a single boolean value representing whether or not the values emitted by both observables were equal in sequence. |
Example:
var code = Rx.Observable.from([
"ArrowUp",
"ArrowUp",
"ArrowDown",
"ArrowDown",
"ArrowLeft",
"ArrowRight",
"ArrowLeft",
"ArrowRight",
"KeyB",
"KeyA",
"Enter" // no start key, clearly.
]);
var keys = Rx.Observable.fromEvent(document, 'keyup')
.map(e => e.code);
var matches = keys.bufferCount(11, 1)
.mergeMap(
last11 =>
Rx.Observable.from(last11)
.sequenceEqual(code)
);
matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));
public sequenceEqual(compareTo: Observable, comparor: function): Observable source
Compares all values of two observables in sequence using an optional comparor function and returns an observable of a single boolean value representing whether or not the two sequences are equal.
Checks to see of all values emitted by both observables are equal, in order.
sequenceEqual
subscribes to two observables and buffers incoming values from each observable. Whenever either
observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom
up; If any value pair doesn't match, the returned observable will emit false
and complete. If one of the
observables completes, the operator will wait for the other observable to complete; If the other
observable emits before completing, the returned observable will emit false
and complete. If one observable never
completes or emits after the other complets, the returned observable will never complete.
Params:
Name | Type | Attribute | Description |
compareTo | Observable | The observable sequence to compare the source sequence to. |
|
comparor | function |
|
An optional function to compare each value pair |
Return:
Observable | An Observable of a single boolean value representing whether or not the values emitted by both observables were equal in sequence. |
Example:
var code = Rx.Observable.from([
"ArrowUp",
"ArrowUp",
"ArrowDown",
"ArrowDown",
"ArrowLeft",
"ArrowRight",
"ArrowLeft",
"ArrowRight",
"KeyB",
"KeyA",
"Enter" // no start key, clearly.
]);
var keys = Rx.Observable.fromEvent(document, 'keyup')
.map(e => e.code);
var matches = keys.bufferCount(11, 1)
.mergeMap(
last11 =>
Rx.Observable.from(last11)
.sequenceEqual(code)
);
matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));
public share(): Observable<T> source
Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one
Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will
unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot
.
This behaves similarly to .publish().refCount(), with a behavior difference when the source observable emits complete. .publish().refCount() will not resubscribe to the original source, however .share() will resubscribe to the original source. Observable.of("test").publish().refCount() will not re-emit "test" on new subscriptions, Observable.of("test").share() will re-emit "test" to new subscriptions.
Return:
Observable<T> | An Observable that upon connection causes the source Observable to emit items to its Observers. |
public share(): Observable<T> source
Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one
Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will
unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot
.
This is an alias for .multicast(() => new Subject()).refCount().
Return:
Observable<T> | An Observable that upon connection causes the source Observable to emit items to its Observers. |
public shareReplay(bufferSize: *, windowTime: *, scheduler: *): * source
Params:
Name | Type | Attribute | Description |
bufferSize | * | ||
windowTime | * | ||
scheduler | * |
Return:
* |
public shareReplay(bufferSize: *, windowTime: *, scheduler: *): * source
Params:
Name | Type | Attribute | Description |
bufferSize | * | ||
windowTime | * | ||
scheduler | * |
Return:
* |
public single(predicate: Function): Observable<T> source
Returns an Observable that emits the single item emitted by the source Observable that matches a specified predicate, if that Observable emits one such item. If the source Observable emits more than one such item or no such items, notify of an IllegalArgumentException or NoSuchElementException respectively.
Params:
Name | Type | Attribute | Description |
predicate | Function | A predicate function to evaluate items emitted by the source Observable. |
Return:
Observable<T> | An Observable that emits the single item emitted by the source Observable that matches the predicate. . |
Throw:
Delivers an EmptyError to the Observer's |
public single(predicate: Function): Observable<T> source
Returns an Observable that emits the single item emitted by the source Observable that matches a specified predicate, if that Observable emits one such item. If the source Observable emits more than one such item or no such items, notify of an IllegalArgumentException or NoSuchElementException respectively.
Params:
Name | Type | Attribute | Description |
predicate | Function | A predicate function to evaluate items emitted by the source Observable. |
Return:
Observable<T> | An Observable that emits the single item emitted by the source Observable that matches the predicate. . |
Throw:
Delivers an EmptyError to the Observer's |
public skip(count: Number): Observable source
Returns an Observable that skips the first count
items emitted by the source Observable.
Params:
Name | Type | Attribute | Description |
count | Number | The number of times, items emitted by source Observable should be skipped. |
public skip(count: Number): Observable source
Returns an Observable that skips the first count
items emitted by the source Observable.
Params:
Name | Type | Attribute | Description |
count | Number | The number of times, items emitted by source Observable should be skipped. |
public skipLast(count: number): Observable<T> source
Skip the last count
values emitted by the source Observable.
skipLast
returns an Observable that accumulates a queue with a length
enough to store the first count
values. As more values are received,
values are taken from the front of the queue and produced on the result
sequence. This causes values to be delayed.
Params:
Name | Type | Attribute | Description |
count | number | Number of elements to skip from the end of the source Observable. |
Return:
Observable<T> | An Observable that skips the last count values emitted by the source Observable. |
Throw:
When using |
Example:
var many = Rx.Observable.range(1, 5);
var skipLastTwo = many.skipLast(2);
skipLastTwo.subscribe(x => console.log(x));
// Results in:
// 1 2 3
public skipLast(count: number): Observable<T> source
Skip the last count
values emitted by the source Observable.
skipLast
returns an Observable that accumulates a queue with a length
enough to store the first count
values. As more values are received,
values are taken from the front of the queue and produced on the result
sequence. This causes values to be delayed.
Params:
Name | Type | Attribute | Description |
count | number | Number of elements to skip from the end of the source Observable. |
Return:
Observable<T> | An Observable that skips the last count values emitted by the source Observable. |
Throw:
When using |
Example:
var many = Rx.Observable.range(1, 5);
var skipLastTwo = many.skipLast(2);
skipLastTwo.subscribe(x => console.log(x));
// Results in:
// 1 2 3
public skipUntil(notifier: Observable): Observable<T> source
Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
Params:
Name | Type | Attribute | Description |
notifier | Observable | The second Observable that has to emit an item before the source Observable's elements begin to be mirrored by the resulting Observable. |
Return:
Observable<T> | An Observable that skips items from the source Observable until the second Observable emits an item, then emits the remaining items. |
public skipUntil(notifier: Observable): Observable<T> source
Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
Params:
Name | Type | Attribute | Description |
notifier | Observable | The second Observable that has to emit an item before the source Observable's elements begin to be mirrored by the resulting Observable. |
Return:
Observable<T> | An Observable that skips items from the source Observable until the second Observable emits an item, then emits the remaining items. |
public skipWhile(predicate: Function): Observable<T> source
Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.
Params:
Name | Type | Attribute | Description |
predicate | Function | A function to test each item emitted from the source Observable. |
Return:
Observable<T> | An Observable that begins emitting items emitted by the source Observable when the specified predicate becomes false. |
public skipWhile(predicate: Function): Observable<T> source
Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.
Params:
Name | Type | Attribute | Description |
predicate | Function | A function to test each item emitted from the source Observable. |
Return:
Observable<T> | An Observable that begins emitting items emitted by the source Observable when the specified predicate becomes false. |
public startWith(values: ...T, scheduler: Scheduler): Observable source
Returns an Observable that emits the items you specify as arguments before it begins to emit items emitted by the source Observable.
Params:
Name | Type | Attribute | Description |
values | ...T | Items you want the modified Observable to emit first. |
|
scheduler | Scheduler |
|
A IScheduler to use for scheduling
the emissions of the |
Return:
Observable | An Observable that emits the items in the specified Iterable and then emits the items emitted by the source Observable. |
public startWith(values: ...T, scheduler: Scheduler): Observable source
Returns an Observable that emits the items you specify as arguments before it begins to emit items emitted by the source Observable.
Params:
Name | Type | Attribute | Description |
values | ...T | Items you want the modified Observable to emit first. |
|
scheduler | Scheduler |
|
A IScheduler to use for scheduling
the emissions of the |
Return:
Observable | An Observable that emits the items in the specified Iterable and then emits the items emitted by the source Observable. |
public subscribe(observerOrNext: Observer | Function, error: Function, complete: Function): ISubscription source
Invokes an execution of an Observable and registers Observer handlers for notifications it will emit.
Use it when you have all these Observables, but still nothing is happening.
subscribe
is not a regular operator, but a method that calls Observable's internal subscribe
function. It
might be for example a function that you passed to a create static factory, but most of the time it is
a library implementation, which defines what and when will be emitted by an Observable. This means that calling
subscribe
is actually the moment when Observable starts its work, not when it is created, as it is often
thought.
Apart from starting the execution of an Observable, this method allows you to listen for values that an Observable emits, as well as for when it completes or errors. You can achieve this in two following ways.
The first way is creating an object that implements Observer interface. It should have methods
defined by that interface, but note that it should be just a regular JavaScript object, which you can create
yourself in any way you want (ES6 class, classic function constructor, object literal etc.). In particular do
not attempt to use any RxJS implementation details to create Observers - you don't need them. Remember also
that your object does not have to implement all methods. If you find yourself creating a method that doesn't
do anything, you can simply omit it. Note however, that if error
method is not provided, all errors will
be left uncaught.
The second way is to give up on Observer object altogether and simply provide callback functions in place of its methods.
This means you can provide three functions as arguments to subscribe
, where first function is equivalent
of a next
method, second of an error
method and third of a complete
method. Just as in case of Observer,
if you do not need to listen for something, you can omit a function, preferably by passing undefined
or null
,
since subscribe
recognizes these functions by where they were placed in function call. When it comes
to error
function, just as before, if not provided, errors emitted by an Observable will be thrown.
Whatever style of calling subscribe
you use, in both cases it returns a Subscription object.
This object allows you to call unsubscribe
on it, which in turn will stop work that an Observable does and will clean
up all resources that an Observable used. Note that cancelling a subscription will not call complete
callback
provided to subscribe
function, which is reserved for a regular completion signal that comes from an Observable.
Remember that callbacks provided to subscribe
are not guaranteed to be called asynchronously.
It is an Observable itself that decides when these functions will be called. For example of
by default emits all its values synchronously. Always check documentation for how given Observable
will behave when subscribed and if its default behavior can be modified with a Scheduler.
Params:
Name | Type | Attribute | Description |
observerOrNext | Observer | Function | (optional) Either an observer with methods to be called, or the first of three possible handlers, which is the handler for each value emitted from the subscribed Observable. |
|
error | Function | (optional) A handler for a terminal event resulting from an error. If no error handler is provided, the error will be thrown as unhandled. |
|
complete | Function | (optional) A handler for a terminal event resulting from successful completion. |
Return:
ISubscription | a subscription reference to the registered handlers |
Example:
const sumObserver = {
sum: 0,
next(value) {
console.log('Adding: ' + value);
this.sum = this.sum + value;
},
error() { // We actually could just remove this method,
}, // since we do not really care about errors right now.
complete() {
console.log('Sum equals: ' + this.sum);
}
};
Rx.Observable.of(1, 2, 3) // Synchronously emits 1, 2, 3 and then completes.
.subscribe(sumObserver);
// Logs:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"
let sum = 0;
Rx.Observable.of(1, 2, 3)
.subscribe(
function(value) {
console.log('Adding: ' + value);
sum = sum + value;
},
undefined,
function() {
console.log('Sum equals: ' + sum);
}
);
// Logs:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"
const subscription = Rx.Observable.interval(1000).subscribe(
num => console.log(num),
undefined,
() => console.log('completed!') // Will not be called, even
); // when cancelling subscription
setTimeout(() => {
subscription.unsubscribe();
console.log('unsubscribed!');
}, 2500);
// Logs:
// 0 after 1s
// 1 after 2s
// "unsubscribed!" after 2.5s
public subscribeOn(scheduler: Scheduler): Observable<T> source
Asynchronously subscribes Observers to this Observable on the specified IScheduler.
Params:
Name | Type | Attribute | Description |
scheduler | Scheduler | The IScheduler to perform subscription actions on. |
Return:
Observable<T> | The source Observable modified so that its subscriptions happen on the specified IScheduler. . |
public subscribeOn(scheduler: Scheduler): Observable<T> source
Asynchronously subscribes Observers to this Observable on the specified IScheduler.
Params:
Name | Type | Attribute | Description |
scheduler | Scheduler | The IScheduler to perform subscription actions on. |
Return:
Observable<T> | The source Observable modified so that its subscriptions happen on the specified IScheduler. . |
public switch(): Observable<T> source
Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.
Flattens an Observable-of-Observables by dropping the previous inner Observable once a new one appears.
switch
subscribes to an Observable that emits Observables, also known as a
higher-order Observable. Each time it observes one of these emitted inner
Observables, the output Observable subscribes to the inner Observable and
begins emitting the items emitted by that. So far, it behaves
like mergeAll. However, when a new inner Observable is emitted,
switch
unsubscribes from the earlier-emitted inner Observable and
subscribes to the new inner Observable and begins emitting items from it. It
continues to behave like this for subsequent inner Observables.
Return:
Observable<T> | An Observable that emits the items emitted by the Observable most recently emitted by the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
// Each click event is mapped to an Observable that ticks every second
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var switched = higherOrder.switch();
// The outcome is that `switched` is essentially a timer that restarts
// on every click. The interval Observables from older clicks do not merge
// with the current interval Observable.
switched.subscribe(x => console.log(x));
public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.
Maps each value to an Observable, then flattens all of these inner Observables using switch.
Returns an Observable that emits items based on applying a function that you
supply to each item emitted by the source Observable, where that function
returns an (so-called "inner") Observable. Each time it observes one of these
inner Observables, the output Observable begins emitting the items emitted by
that inner Observable. When a new inner Observable is emitted, switchMap
stops emitting items from the earlier-emitted inner Observable and begins
emitting items from the new one. It continues to behave like this for
subsequent inner Observables.
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | A function that, when applied to an item emitted by the source Observable, returns an Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An Observable that emits the result of applying the
projection function (and the optional |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMap((ev) => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.
Maps each value to an Observable, then flattens all of these inner Observables using switch.
Returns an Observable that emits items based on applying a function that you
supply to each item emitted by the source Observable, where that function
returns an (so-called "inner") Observable. Each time it observes one of these
inner Observables, the output Observable begins emitting the items emitted by
that inner Observable. When a new inner Observable is emitted, switchMap
stops emitting items from the earlier-emitted inner Observable and begins
emitting items from the new one. It continues to behave like this for
subsequent inner Observables.
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | A function that, when applied to an item emitted by the source Observable, returns an Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An Observable that emits the result of applying the
projection function (and the optional |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMap((ev) => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to the same Observable which is flattened multiple times with switch in the output Observable.
It's like switchMap, but maps each value always to the same inner Observable.
Maps each source value to the given Observable innerObservable
regardless
of the source value, and then flattens those resulting Observables into one
single Observable, which is the output Observable. The output Observables
emits values only from the most recently emitted instance of
innerObservable
.
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | An Observable to replace each value from the source Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An Observable that emits items from the given
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
Projects each source value to the same Observable which is flattened multiple times with switch in the output Observable.
It's like switchMap, but maps each value always to the same inner Observable.
Maps each source value to the given Observable innerObservable
regardless
of the source value, and then flattens those resulting Observables into one
single Observable, which is the output Observable. The output Observables
emits values only from the most recently emitted instance of
innerObservable
.
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | An Observable to replace each value from the source Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
A function to produce the value on the output Observable based on the values and the indices of the source (outer) emission and the inner Observable emission. The arguments passed to this function are:
|
Return:
Observable | An Observable that emits items from the given
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public take(count: number): Observable<T> source
Emits only the first count
values emitted by the source Observable.
Takes the first count
values from the source, then
completes.
take
returns an Observable that emits only the first count
values emitted
by the source Observable. If the source emits fewer than count
values then
all of its values are emitted. After that, it completes, regardless if the
source completes.
Params:
Name | Type | Attribute | Description |
count | number | The maximum number of |
Return:
Observable<T> | An Observable that emits only the first |
Throw:
When using |
Example:
var interval = Rx.Observable.interval(1000);
var five = interval.take(5);
five.subscribe(x => console.log(x));
public take(count: number): Observable<T> source
Emits only the first count
values emitted by the source Observable.
Takes the first count
values from the source, then
completes.
take
returns an Observable that emits only the first count
values emitted
by the source Observable. If the source emits fewer than count
values then
all of its values are emitted. After that, it completes, regardless if the
source completes.
Params:
Name | Type | Attribute | Description |
count | number | The maximum number of |
Return:
Observable<T> | An Observable that emits only the first |
Throw:
When using |
Example:
var interval = Rx.Observable.interval(1000);
var five = interval.take(5);
five.subscribe(x => console.log(x));
public takeLast(count: number): Observable<T> source
Emits only the last count
values emitted by the source Observable.
Remembers the latest count
values, then emits those
only when the source completes.
takeLast
returns an Observable that emits at most the last count
values
emitted by the source Observable. If the source emits fewer than count
values then all of its values are emitted. This operator must wait until the
complete
notification emission from the source in order to emit the next
values on the output Observable, because otherwise it is impossible to know
whether or not more values will be emitted on the source. For this reason,
all values are emitted synchronously, followed by the complete notification.
Params:
Name | Type | Attribute | Description |
count | number | The maximum number of values to emit from the end of the sequence of values emitted by the source Observable. |
Return:
Observable<T> | An Observable that emits at most the last count values emitted by the source Observable. |
Throw:
When using |
Example:
var many = Rx.Observable.range(1, 100);
var lastThree = many.takeLast(3);
lastThree.subscribe(x => console.log(x));
public takeLast(count: number): Observable<T> source
Emits only the last count
values emitted by the source Observable.
Remembers the latest count
values, then emits those
only when the source completes.
takeLast
returns an Observable that emits at most the last count
values
emitted by the source Observable. If the source emits fewer than count
values then all of its values are emitted. This operator must wait until the
complete
notification emission from the source in order to emit the next
values on the output Observable, because otherwise it is impossible to know
whether or not more values will be emitted on the source. For this reason,
all values are emitted synchronously, followed by the complete notification.
Params:
Name | Type | Attribute | Description |
count | number | The maximum number of values to emit from the end of the sequence of values emitted by the source Observable. |
Return:
Observable<T> | An Observable that emits at most the last count values emitted by the source Observable. |
Throw:
When using |
Example:
var many = Rx.Observable.range(1, 100);
var lastThree = many.takeLast(3);
lastThree.subscribe(x => console.log(x));
public takeUntil(notifier: Observable): Observable<T> source
Emits the values emitted by the source Observable until a notifier
Observable emits a value.
Lets values pass until a second Observable,
notifier
, emits something. Then, it completes.
takeUntil
subscribes and begins mirroring the source Observable. It also
monitors a second Observable, notifier
that you provide. If the notifier
emits a value or a complete notification, the output Observable stops
mirroring the source Observable and completes.
Params:
Name | Type | Attribute | Description |
notifier | Observable | The Observable whose first emitted value will
cause the output Observable of |
Return:
Observable<T> | An Observable that emits the values from the source
Observable until such time as |
Example:
var interval = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = interval.takeUntil(clicks);
result.subscribe(x => console.log(x));
public takeUntil(notifier: Observable): Observable<T> source
Emits the values emitted by the source Observable until a notifier
Observable emits a value.
Lets values pass until a second Observable,
notifier
, emits something. Then, it completes.
takeUntil
subscribes and begins mirroring the source Observable. It also
monitors a second Observable, notifier
that you provide. If the notifier
emits a value, the output Observable stops mirroring the source Observable
and completes.
Params:
Name | Type | Attribute | Description |
notifier | Observable | The Observable whose first emitted value will
cause the output Observable of |
Return:
Observable<T> | An Observable that emits the values from the source
Observable until such time as |
Example:
var interval = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = interval.takeUntil(clicks);
result.subscribe(x => console.log(x));
public takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> source
Emits values emitted by the source Observable so long as each value satisfies
the given predicate
, and then completes as soon as this predicate
is not
satisfied.
Takes values from the source only while they pass the condition given. When the first value does not satisfy, it completes.
takeWhile
subscribes and begins mirroring the source Observable. Each value
emitted on the source is given to the predicate
function which returns a
boolean, representing a condition to be satisfied by the source values. The
output Observable emits the source values until such time as the predicate
returns false, at which point takeWhile
stops mirroring the source
Observable and completes the output Observable.
Return:
Observable<T> | An Observable that emits the values from the source
Observable so long as each value satisfies the condition defined by the
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.takeWhile(ev => ev.clientX > 200);
result.subscribe(x => console.log(x));
public takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> source
Emits values emitted by the source Observable so long as each value satisfies
the given predicate
, and then completes as soon as this predicate
is not
satisfied.
Takes values from the source only while they pass the condition given. When the first value does not satisfy, it completes.
takeWhile
subscribes and begins mirroring the source Observable. Each value
emitted on the source is given to the predicate
function which returns a
boolean, representing a condition to be satisfied by the source values. The
output Observable emits the source values until such time as the predicate
returns false, at which point takeWhile
stops mirroring the source
Observable and completes the output Observable.
Return:
Observable<T> | An Observable that emits the values from the source
Observable so long as each value satisfies the condition defined by the
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.takeWhile(ev => ev.clientX > 200);
result.subscribe(x => console.log(x));
public throttle(durationSelector: function(value: T): SubscribableOrPromise, config: Object): Observable<T> source
Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.
It's like throttleTime, but the silencing duration is determined by a second Observable.
throttle
emits the source Observable values on the output Observable
when its internal timer is disabled, and ignores source values when the timer
is enabled. Initially, the timer is disabled. As soon as the first source
value arrives, it is forwarded to the output Observable, and then the timer
is enabled by calling the durationSelector
function with the source value,
which returns the "duration" Observable. When the duration Observable emits a
value or completes, the timer is disabled, and this process repeats for the
next source value.
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | A function that receives a value from the source Observable, for computing the silencing duration for each source value, returned as an Observable or a Promise. |
|
config | Object | a configuration object to define |
Return:
Observable<T> | An Observable that performs the throttle operation to limit the rate of emissions from the source. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttle(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public throttle(durationSelector: function(value: T): SubscribableOrPromise, config: Object): Observable<T> source
Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.
It's like throttleTime, but the silencing duration is determined by a second Observable.
throttle
emits the source Observable values on the output Observable
when its internal timer is disabled, and ignores source values when the timer
is enabled. Initially, the timer is disabled. As soon as the first source
value arrives, it is forwarded to the output Observable, and then the timer
is enabled by calling the durationSelector
function with the source value,
which returns the "duration" Observable. When the duration Observable emits a
value or completes, the timer is disabled, and this process repeats for the
next source value.
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | A function that receives a value from the source Observable, for computing the silencing duration for each source value, returned as an Observable or a Promise. |
|
config | Object | a configuration object to define |
Return:
Observable<T> | An Observable that performs the throttle operation to limit the rate of emissions from the source. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttle(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public throttleTime(duration: number, scheduler: Scheduler): Observable<T> source
Emits a value from the source Observable, then ignores subsequent source
values for duration
milliseconds, then repeats this process.
Lets a value pass, then ignores source values for the
next duration
milliseconds.
throttleTime
emits the source Observable values on the output Observable
when its internal timer is disabled, and ignores source values when the timer
is enabled. Initially, the timer is disabled. As soon as the first source
value arrives, it is forwarded to the output Observable, and then the timer
is enabled. After duration
milliseconds (or the time unit determined
internally by the optional scheduler
) has passed, the timer is disabled,
and this process repeats for the next source value. Optionally takes a
IScheduler for managing timers.
Params:
Name | Type | Attribute | Description |
duration | number | Time to wait before emitting another value after
emitting the last value, measured in milliseconds or the time unit determined
internally by the optional |
|
scheduler | Scheduler |
|
The IScheduler to use for managing the timers that handle the throttling. |
Return:
Observable<T> | An Observable that performs the throttle operation to limit the rate of emissions from the source. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttleTime(1000);
result.subscribe(x => console.log(x));
public throttleTime(duration: number, scheduler: Scheduler): Observable<T> source
Emits a value from the source Observable, then ignores subsequent source
values for duration
milliseconds, then repeats this process.
Lets a value pass, then ignores source values for the
next duration
milliseconds.
throttleTime
emits the source Observable values on the output Observable
when its internal timer is disabled, and ignores source values when the timer
is enabled. Initially, the timer is disabled. As soon as the first source
value arrives, it is forwarded to the output Observable, and then the timer
is enabled. After duration
milliseconds (or the time unit determined
internally by the optional scheduler
) has passed, the timer is disabled,
and this process repeats for the next source value. Optionally takes a
IScheduler for managing timers.
Params:
Name | Type | Attribute | Description |
duration | number | Time to wait before emitting another value after
emitting the last value, measured in milliseconds or the time unit determined
internally by the optional |
|
scheduler | Scheduler |
|
The IScheduler to use for managing the timers that handle the throttling. |
Return:
Observable<T> | An Observable that performs the throttle operation to limit the rate of emissions from the source. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttleTime(1000);
result.subscribe(x => console.log(x));
public timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T> source
Params:
Name | Type | Attribute | Description |
scheduler | * |
public timeout(due: number | Date, scheduler: Scheduler): Observable<T> source
Errors if Observable does not emit a value in given time span.
Timeouts on Observable that doesn't emit values fast enough.
timeout
operator accepts as an argument either a number or a Date.
If number was provided, it returns an Observable that behaves like a source
Observable, unless there is a period of time where there is no value emitted.
So if you provide 100
as argument and first value comes after 50ms from
the moment of subscription, this value will be simply re-emitted by the resulting
Observable. If however after that 100ms passes without a second value being emitted,
stream will end with an error and source Observable will be unsubscribed.
These checks are performed throughout whole lifecycle of Observable - from the moment
it was subscribed to, until it completes or errors itself. Thus every value must be
emitted within specified period since previous value.
If provided argument was Date, returned Observable behaves differently. It throws if Observable did not complete before provided Date. This means that periods between emission of particular values do not matter in this case. If Observable did not complete before provided Date, source Observable will be unsubscribed. Other than that, resulting stream behaves just as source Observable.
timeout
accepts also a Scheduler as a second parameter. It is used to schedule moment (or moments)
when returned Observable will check if source stream emitted value or completed.
Example:
const seconds = Rx.Observable.interval(1000);
seconds.timeout(1100) // Let's use bigger timespan to be safe,
// since `interval` might fire a bit later then scheduled.
.subscribe(
value => console.log(value), // Will emit numbers just as regular `interval` would.
err => console.log(err) // Will never be called.
);
seconds.timeout(900).subscribe(
value => console.log(value), // Will never be called.
err => console.log(err) // Will emit error before even first value is emitted,
// since it did not arrive within 900ms period.
);
const seconds = Rx.Observable.interval(1000);
seconds.timeout(new Date("December 17, 2020 03:24:00"))
.subscribe(
value => console.log(value), // Will emit values as regular `interval` would
// until December 17, 2020 at 03:24:00.
err => console.log(err) // On December 17, 2020 at 03:24:00 it will emit an error,
// since Observable did not complete by then.
);
See:
public timeout(due: number | Date, scheduler: Scheduler): Observable<T> source
Errors if Observable does not emit a value in given time span.
Timeouts on Observable that doesn't emit values fast enough.
timeout
operator accepts as an argument either a number or a Date.
If number was provided, it returns an Observable that behaves like a source
Observable, unless there is a period of time where there is no value emitted.
So if you provide 100
as argument and first value comes after 50ms from
the moment of subscription, this value will be simply re-emitted by the resulting
Observable. If however after that 100ms passes without a second value being emitted,
stream will end with an error and source Observable will be unsubscribed.
These checks are performed throughout whole lifecycle of Observable - from the moment
it was subscribed to, until it completes or errors itself. Thus every value must be
emitted within specified period since previous value.
If provided argument was Date, returned Observable behaves differently. It throws if Observable did not complete before provided Date. This means that periods between emission of particular values do not matter in this case. If Observable did not complete before provided Date, source Observable will be unsubscribed. Other than that, resulting stream behaves just as source Observable.
timeout
accepts also a Scheduler as a second parameter. It is used to schedule moment (or moments)
when returned Observable will check if source stream emitted value or completed.
Example:
const seconds = Rx.Observable.interval(1000);
seconds.timeout(1100) // Let's use bigger timespan to be safe,
// since `interval` might fire a bit later then scheduled.
.subscribe(
value => console.log(value), // Will emit numbers just as regular `interval` would.
err => console.log(err) // Will never be called.
);
seconds.timeout(900).subscribe(
value => console.log(value), // Will never be called.
err => console.log(err) // Will emit error before even first value is emitted,
// since it did not arrive within 900ms period.
);
const seconds = Rx.Observable.interval(1000);
seconds.timeout(new Date("December 17, 2020 03:24:00"))
.subscribe(
value => console.log(value), // Will emit values as regular `interval` would
// until December 17, 2020 at 03:24:00.
err => console.log(err) // On December 17, 2020 at 03:24:00 it will emit an error,
// since Observable did not complete by then.
);
See:
public timeoutWith(due: number | Date, withObservable: Observable<T>, scheduler: Scheduler): Observable<T> source
Errors if Observable does not emit a value in given time span, in case of which subscribes to the second Observable.
It's a version of timeout
operator that let's you specify fallback Observable.
timeoutWith
is a variation of timeout
operator. It behaves exactly the same,
still accepting as a first argument either a number or a Date, which control - respectively -
when values of source Observable should be emitted or when it should complete.
The only difference is that it accepts a second, required parameter. This parameter
should be an Observable which will be subscribed when source Observable fails any timeout check.
So whenever regular timeout
would emit an error, timeoutWith
will instead start re-emitting
values from second Observable. Note that this fallback Observable is not checked for timeouts
itself, so it can emit values and complete at arbitrary points in time. From the moment of a second
subscription, Observable returned from timeoutWith
simply mirrors fallback stream. When that
stream completes, it completes as well.
Scheduler, which in case of timeout
is provided as as second argument, can be still provided
here - as a third, optional parameter. It still is used to schedule timeout checks and -
as a consequence - when second Observable will be subscribed, since subscription happens
immediately after failing check.
Params:
Name | Type | Attribute | Description |
due | number | Date | Number specifying period within which Observable must emit values or Date specifying before when Observable should complete |
|
withObservable | Observable<T> | Observable which will be subscribed if source fails timeout check. |
|
scheduler | Scheduler |
|
Scheduler controlling when timeout checks occur. |
Return:
Observable<T> | Observable that mirrors behaviour of source or, when timeout check fails, of an Observable passed as a second parameter. |
Example:
const seconds = Rx.Observable.interval(1000);
const minutes = Rx.Observable.interval(60 * 1000);
seconds.timeoutWith(900, minutes)
.subscribe(
value => console.log(value), // After 900ms, will start emitting `minutes`,
// since first value of `seconds` will not arrive fast enough.
err => console.log(err) // Would be called after 900ms in case of `timeout`,
// but here will never be called.
);
public timeoutWith(due: number | Date, withObservable: Observable<T>, scheduler: Scheduler): Observable<T> source
Errors if Observable does not emit a value in given time span, in case of which subscribes to the second Observable.
It's a version of timeout
operator that let's you specify fallback Observable.
timeoutWith
is a variation of timeout
operator. It behaves exactly the same,
still accepting as a first argument either a number or a Date, which control - respectively -
when values of source Observable should be emitted or when it should complete.
The only difference is that it accepts a second, required parameter. This parameter
should be an Observable which will be subscribed when source Observable fails any timeout check.
So whenever regular timeout
would emit an error, timeoutWith
will instead start re-emitting
values from second Observable. Note that this fallback Observable is not checked for timeouts
itself, so it can emit values and complete at arbitrary points in time. From the moment of a second
subscription, Observable returned from timeoutWith
simply mirrors fallback stream. When that
stream completes, it completes as well.
Scheduler, which in case of timeout
is provided as as second argument, can be still provided
here - as a third, optional parameter. It still is used to schedule timeout checks and -
as a consequence - when second Observable will be subscribed, since subscription happens
immediately after failing check.
Params:
Name | Type | Attribute | Description |
due | number | Date | Number specifying period within which Observable must emit values or Date specifying before when Observable should complete |
|
withObservable | Observable<T> | Observable which will be subscribed if source fails timeout check. |
|
scheduler | Scheduler |
|
Scheduler controlling when timeout checks occur. |
Return:
Observable<T> | Observable that mirrors behaviour of source or, when timeout check fails, of an Observable passed as a second parameter. |
Example:
const seconds = Rx.Observable.interval(1000);
const minutes = Rx.Observable.interval(60 * 1000);
seconds.timeoutWith(900, minutes)
.subscribe(
value => console.log(value), // After 900ms, will start emitting `minutes`,
// since first value of `seconds` will not arrive fast enough.
err => console.log(err) // Would be called after 900ms in case of `timeout`,
// but here will never be called.
);
public timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> source
Params:
Name | Type | Attribute | Description |
scheduler | * |
public timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> source
Params:
Name | Type | Attribute | Description |
scheduler | * |
public toArray(): Observable<any[]> | WebSocketSubject<T> | Observable<T> source
Collects all source emissions and emits them as an array when the source completes.
Get all values inside an array when the source completes
toArray
will wait until the source Observable completes
before emitting the array containing all emissions.
When the source Observable errors no array will be emitted.
Example:
const input = Rx.Observable.interval(100).take(4);
input.toArray()
.subscribe(arr => console.log(arr)); // [0,1,2,3]
See:
public window(windowBoundaries: Observable<any>): Observable<Observable<T>> source
Branch out the source Observable values as a nested Observable whenever
windowBoundaries
emits.
It's like buffer, but emits a nested Observable instead of an array.
Returns an Observable that emits windows of items it collects from the source
Observable. The output Observable emits connected, non-overlapping
windows. It emits the current window and opens a new one whenever the
Observable windowBoundaries
emits an item. Because each window is an
Observable, the output is a higher-order Observable.
Params:
Name | Type | Attribute | Description |
windowBoundaries | Observable<any> | An Observable that completes the previous window and starts a new window. |
Return:
Observable<Observable<T>> | An Observable of windows, which are Observables emitting values of the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var result = clicks.window(interval)
.map(win => win.take(2)) // each window has at most 2 emissions
.mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
See:
- windowCount
- windowTime
- windowToggle
- windowWhen
- buffer
public window(windowBoundaries: Observable<any>): Observable<Observable<T>> source
Branch out the source Observable values as a nested Observable whenever
windowBoundaries
emits.
It's like buffer, but emits a nested Observable instead of an array.
Returns an Observable that emits windows of items it collects from the source
Observable. The output Observable emits connected, non-overlapping
windows. It emits the current window and opens a new one whenever the
Observable windowBoundaries
emits an item. Because each window is an
Observable, the output is a higher-order Observable.
Params:
Name | Type | Attribute | Description |
windowBoundaries | Observable<any> | An Observable that completes the previous window and starts a new window. |
Return:
Observable<Observable<T>> | An Observable of windows, which are Observables emitting values of the source Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var result = clicks.window(interval)
.map(win => win.take(2)) // each window has at most 2 emissions
.mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
See:
- windowCount
- windowTime
- windowToggle
- windowWhen
- buffer
public windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> source
Branch out the source Observable values as a nested Observable with each
nested Observable emitting at most windowSize
values.
It's like bufferCount, but emits a nested Observable instead of an array.
Returns an Observable that emits windows of items it collects from the source
Observable. The output Observable emits windows every startWindowEvery
items, each containing no more than windowSize
items. When the source
Observable completes or encounters an error, the output Observable emits
the current window and propagates the notification from the source
Observable. If startWindowEvery
is not provided, then new windows are
started immediately at the start of the source and when each window completes
with size windowSize
.
Params:
Name | Type | Attribute | Description |
windowSize | number | The maximum number of values emitted by each window. |
|
startWindowEvery | number |
|
Interval at which to start a new window.
For example if |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(3)
.map(win => win.skip(1)) // skip first of every 3 clicks
.mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(2, 3)
.mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
See:
- window
- windowTime
- windowToggle
- windowWhen
- bufferCount
public windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> source
Branch out the source Observable values as a nested Observable with each
nested Observable emitting at most windowSize
values.
It's like bufferCount, but emits a nested Observable instead of an array.
Returns an Observable that emits windows of items it collects from the source
Observable. The output Observable emits windows every startWindowEvery
items, each containing no more than windowSize
items. When the source
Observable completes or encounters an error, the output Observable emits
the current window and propagates the notification from the source
Observable. If startWindowEvery
is not provided, then new windows are
started immediately at the start of the source and when each window completes
with size windowSize
.
Params:
Name | Type | Attribute | Description |
windowSize | number | The maximum number of values emitted by each window. |
|
startWindowEvery | number |
|
Interval at which to start a new window.
For example if |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(3)
.map(win => win.skip(1)) // skip first of every 3 clicks
.mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(2, 3)
.mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
See:
- window
- windowTime
- windowToggle
- windowWhen
- bufferCount
public windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> source
Branch out the source Observable values as a nested Observable starting from
an emission from openings
and ending when the output of closingSelector
emits.
It's like bufferToggle, but emits a nested Observable instead of an array.
Returns an Observable that emits windows of items it collects from the source
Observable. The output Observable emits windows that contain those items
emitted by the source Observable between the time when the openings
Observable emits an item and when the Observable returned by
closingSelector
emits an item.
Params:
Name | Type | Attribute | Description |
openings | Observable<O> | An observable of notifications to start new windows. |
|
closingSelector | function(value: O): Observable | A function that takes
the value emitted by the |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var result = clicks.windowToggle(openings, i =>
i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
).mergeAll();
result.subscribe(x => console.log(x));
See:
- window
- windowCount
- windowTime
- windowWhen
- bufferToggle
public windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> source
Branch out the source Observable values as a nested Observable starting from
an emission from openings
and ending when the output of closingSelector
emits.
It's like bufferToggle, but emits a nested Observable instead of an array.
Returns an Observable that emits windows of items it collects from the source
Observable. The output Observable emits windows that contain those items
emitted by the source Observable between the time when the openings
Observable emits an item and when the Observable returned by
closingSelector
emits an item.
Params:
Name | Type | Attribute | Description |
openings | Observable<O> | An observable of notifications to start new windows. |
|
closingSelector | function(value: O): Observable | A function that takes
the value emitted by the |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var result = clicks.windowToggle(openings, i =>
i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
).mergeAll();
result.subscribe(x => console.log(x));
See:
- window
- windowCount
- windowTime
- windowWhen
- bufferToggle
public windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> source
Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.
It's like bufferWhen, but emits a nested Observable instead of an array.
Returns an Observable that emits windows of items it collects from the source
Observable. The output Observable emits connected, non-overlapping windows.
It emits the current window and opens a new one whenever the Observable
produced by the specified closingSelector
function emits an item. The first
window is opened immediately when subscribing to the output Observable.
Params:
Name | Type | Attribute | Description |
closingSelector | function(): Observable | A function that takes no
arguments and returns an Observable that signals (on either |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks
.windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000))
.map(win => win.take(2)) // each window has at most 2 emissions
.mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
See:
- window
- windowCount
- windowTime
- windowToggle
- bufferWhen
public windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> source
Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.
It's like bufferWhen, but emits a nested Observable instead of an array.
Returns an Observable that emits windows of items it collects from the source
Observable. The output Observable emits connected, non-overlapping windows.
It emits the current window and opens a new one whenever the Observable
produced by the specified closingSelector
function emits an item. The first
window is opened immediately when subscribing to the output Observable.
Params:
Name | Type | Attribute | Description |
closingSelector | function(): Observable | A function that takes no
arguments and returns an Observable that signals (on either |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks
.windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000))
.map(win => win.take(2)) // each window has at most 2 emissions
.mergeAll(); // flatten the Observable-of-Observables
result.subscribe(x => console.log(x));
See:
- window
- windowCount
- windowTime
- windowToggle
- bufferWhen
public withLatestFrom(other: ObservableInput, project: Function): Observable source
Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.
Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.
withLatestFrom
combines each value from the source Observable (the
instance) with the latest values from the other input Observables only when
the source emits a value, optionally using a project
function to determine
the value to be emitted on the output Observable. All input Observables must
emit at least one value before the output Observable will emit a value.
Params:
Name | Type | Attribute | Description |
other | ObservableInput | An input Observable to combine with the source Observable. More than one input Observables may be given as argument. |
|
project | Function |
|
Projection function for combining values
together. Receives all values in order of the Observables passed, where the
first parameter is a value from the source Observable. (e.g.
|
Return:
Observable | An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var result = clicks.withLatestFrom(timer);
result.subscribe(x => console.log(x));
See:
public withLatestFrom(other: ObservableInput, project: Function): Observable source
Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.
Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.
withLatestFrom
combines each value from the source Observable (the
instance) with the latest values from the other input Observables only when
the source emits a value, optionally using a project
function to determine
the value to be emitted on the output Observable. All input Observables must
emit at least one value before the output Observable will emit a value.
Params:
Name | Type | Attribute | Description |
other | ObservableInput | An input Observable to combine with the source Observable. More than one input Observables may be given as argument. |
|
project | Function |
|
Projection function for combining values
together. Receives all values in order of the Observables passed, where the
first parameter is a value from the source Observable. (e.g.
|
Return:
Observable | An Observable of projected values from the most recent values from each input Observable, or an array of the most recent values from each input Observable. |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var result = clicks.withLatestFrom(timer);
result.subscribe(x => console.log(x));
See:
public zip(observables: *): Observable<R> source
Params:
Name | Type | Attribute | Description |
observables | * |
public zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T> source
Params:
Name | Type | Attribute | Description |
project | * |
public zipProto(observables: *): Observable<R> source
Params:
Name | Type | Attribute | Description |
observables | * |