Timeout

mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items

Timeout

The Timeout operator allows you to abort an Observable with an onError termination if that Observable fails to emit any items during a specified span of time.

See Also

Language-Specific Information:

RxGroovy implements this operator as timeout, but in several variants.

timeout

The first variant accepts parameters that define a duration of time (a quantity of time, and a TimeUnit that this quantity is denominated in). Each time the source Observable emits an item, timeout starts a timer, and if that timer exceeds the duration before the source Observable emits another item, timeout terminates its Observable with an error (TimeoutException).

By default this variant of timeout operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to timeout

timeout

A second variant of timeout differs from the first in that instead of issuing an error notification in case of a timeout condition, it instead immediately switches to a backup Observable that you specify.

By default this variant of timeout operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to timeout

timeout

A third variant of timeout does not use a constant timeout duration, but sets its timeout duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. If any such Observable completes before the source Observable emits another item, this is considered a timeout condition, and triggers an onError notification (“TimeoutException”) from the Observable timeout returns.

This variant of timeout by default runs on the immediate Scheduler.

timeout

There is also a variant of timeout that both uses a per-item Observable to set the timeout duration and switches to a backup Observable in case of a timeout.

This variant of timeout by default runs on the immediate Scheduler.

timeout

The variant of timeout that uses a per-item Observable to set the timeout has a variant that allows you to pass in a function that returns an Observable that acts as a timeout timer for the very first item emitted by the source Observable (in the absence of this, there would be no timeout for the first item).

This variant of timeout by default runs on the immediate Scheduler.

timeout

And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.

This variant of timeout by default runs on the immediate Scheduler.

RxJava implements this operator as timeout, but in several variants.

timeout

The first variant accepts parameters that define a duration of time (a quantity of time, and a TimeUnit that this quantity is denominated in). Each time the source Observable emits an item, timeout starts a timer, and if that timer exceeds the duration before the source Observable emits another item, timeout terminates its Observable with an error (TimeoutException).

By default this variant of timeout operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to timeout

timeout

A second variant of timeout differs from the first in that instead of issuing an error notification in case of a timeout condition, it instead immediately switches to a backup Observable that you specify.

By default this variant of timeout operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to timeout.

timeout

A third variant of timeout does not use a constant timeout duration, but sets its timeout duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. If any such Observable completes before the source Observable emits another item, this is considered a timeout condition, and triggers an onError notification (“TimeoutException”) from the Observable timeout returns.

This variant of timeout by default runs on the immediate Scheduler.

timeout

There is also a variant of timeout that both uses a per-item Observable to set the timeout duration and switches to a backup Observable in case of a timeout.

This variant of timeout by default runs on the immediate Scheduler.

timeout

The variant of timeout that uses a per-item Observable to set the timeout has a variant that allows you to pass in a function that returns an Observable that acts as a timeout timer for the very first item emitted by the source Observable (in the absence of this, there would be no timeout for the first item).

This variant of timeout by default runs on the immediate Scheduler.

timeout

And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.

This variant of timeout by default runs on the immediate Scheduler.

RxJS implements this operator as timeout and timeoutWithSelector:

timeout

One variant of timeout accepts a duration of time (in milliseconds). Each time the source Observable emits an item, timeout starts a timer, and if that timer exceeds the duration before the source Observable emits another item, timeout terminates its Observable with an error (“Timeout” or a string of your choice that you pass as an optional second parameter).

Sample Code

var source = Rx.Observable
    .return(42)
    .delay(5000)
    .timeout(200, 'Timeout has occurred.');

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Error: Timeout has occurred.
timeout

Another variant allows you to instruct timeout to switch to a backup Observable that you specify, rather than terminating with an error, if the timeout condition is triggered. To use this variant, pass the backup Observable (or Promise) as the second parameter to timeout.

Sample Code

var source = Rx.Observable
  .return(42)
  .delay(5000)
  .timeout(200, Promise.resolve(42));

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 42
Completed
timeoutWithSelector

timeoutWithSelector does not use a constant timeout duration, but sets its timeout duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. If any such Observable completes before the source Observable emits another item, this is considered a timeout condition, and triggers an onError notification (“Error: Timeout”) from the Observable timeoutWithSelector returns.

Sample Code

var array = [
    200,
    300,
    350,
    400
];

var source = Rx.Observable
    .for(array, function (x) {
        return Rx.Observable.timer(x);
    })
    .map(function (x, i) { return i; })
    .timeoutWithSelector(function (x) {
        return Rx.Observable.timer(400);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Error: Error: Timeout
timeoutWithSelector

There is also a variant of timeoutWithSelector that both uses a per-item Observable to set the timeout duration and switches to a backup Observable in case of a timeout.

Sample Code

var array = [
    200,
    300,
    350,
    400
];

var source = Rx.Observable
    .for(array, function (x) {
        return Rx.Observable.timer(x);
    })
    .map(function (x, i) { return i; })
    .timeoutWithSelector(function (x) {
        return Rx.Observable.timer(400);
    }, Rx.Observable.return(42));

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Next: 42
Completed
timeoutWithSelector

The variant of timeoutWithSelector that uses a per-item Observable to set the timeout has a variant that allows you to pass in an Observable that acts as a timeout timer for the very first item emitted by the source Observable (in the absence of this, there would be no timeout for the first item; that is to say, the default Observable that governs this first timeout period is Rx.Observable.never()).

Sample Code

var array = [
    200,
    300,
    350,
    400
];

var source = Rx.Observable
    .for(array, function (x) {
        return Rx.Observable.timer(x);
    })
    .map(function (x, i) { return i; })
    .timeoutWithSelector(Rx.Observable.timer(250), function (x) {
        return Rx.Observable.timer(400);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Error: Error: Timeout
timeoutWithSelector

And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.

Sample Code

var array = [
    200,
    300,
    350,
    400
];

var source = Rx.Observable
    .for(array, function (x) {
        return Rx.Observable.timer(x);
    })
    .map(function (x, i) { return i; })
    .timeoutWithSelector(Rx.Observable.timer(250), function (x) {
        return Rx.Observable.timer(400);
    }, Rx.Observable.return(42));

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Next: 42
Completed

timeout and timeoutWithSelector are found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.time.js
  • rx.lite.js
  • rx.lite.compat.js

They require one of the following distributions:

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP implements this operator as timeout.

Errors the observable sequence if no item is emitted in the specified time. When a timeout occurs, this operator errors with an instance of Rx\Exception\TimeoutException

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/timeout/timeout.php

Rx\Observable::interval(1000)
    ->timeout(500)
    ->subscribe($createStdoutObserver('One second - '));

Rx\Observable::interval(100)
    ->take(3)
    ->timeout(500)
    ->subscribe($createStdoutObserver('100 ms     - '));

   
100 ms     - Next value: 0
100 ms     - Next value: 1
100 ms     - Next value: 2
100 ms     - Complete!
One second - Exception: timeout
    

TBD