The Timeout operator allows you to abort an Observable with an
onError
termination if that Observable fails to emit any items during a specified span of
time.
TBD
TBD
RxGroovy implements this operator as timeout
, but in several variants.
The first variant accepts parameters that define a duration of time (a quantity of time, and a
TimeUnit
that this quantity is denominated in). Each time the source Observable emits an
item, timeout
starts a timer, and if that timer exceeds the duration before the source
Observable emits another item, timeout
terminates its Observable with an error
(TimeoutException
).
By default this variant of timeout
operates on the computation
Scheduler, but you can choose a different Scheduler by passing it in as
an optional third parameter to timeout
timeout(long,TimeUnit)
timeout()
A second variant of timeout
differs from the first in that instead of issuing an error
notification in case of a timeout condition, it instead immediately switches to a backup Observable that
you specify.
By default this variant of timeout
operates on the computation
Scheduler, but you can choose a different Scheduler by passing it in as
an optional third parameter to timeout
timeout(long,TimeUnit,Observable)
timeout(long,TimeUnit,Observable,Scheduler)
A third variant of timeout
does not use a constant timeout duration, but sets its timeout
duration on a per-item basis by passing each item from the source Observable into a function that returns
an Observable and then monitoring those Observables. If any such Observable completes before the source
Observable emits another item, this is considered a timeout condition, and triggers an
onError
notification (“TimeoutException
”) from the Observable
timeout
returns.
This variant of timeout
by default runs on the immediate
Scheduler.
timeout(Func1)
There is also a variant of timeout
that both uses a per-item Observable to set the timeout
duration and switches to a backup Observable in case of a timeout.
This variant of timeout
by default runs on the immediate
Scheduler.
timeout(Func1,Observable)
The variant of timeout
that uses a per-item Observable to set the timeout has a variant
that allows you to pass in a function that returns an Observable that acts as a timeout timer for the
very first item emitted by the source Observable (in the absence of this, there would be no timeout for
the first item).
This variant of timeout
by default runs on the immediate
Scheduler.
timeout(Func0,Func1)
And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.
This variant of timeout
by default runs on the immediate
Scheduler.
timeout(Func0,Func1,Observable)
RxJava implements this operator as timeout
, but in several variants.
The first variant accepts parameters that define a duration of time (a quantity of time, and a
TimeUnit
that this quantity is denominated in). Each time the source Observable emits an
item, timeout
starts a timer, and if that timer exceeds the duration before the source
Observable emits another item, timeout
terminates its Observable with an error
(TimeoutException
).
By default this variant of timeout
operates on the computation
Scheduler, but you can choose a different Scheduler by passing it in as
an optional third parameter to timeout
timeout(long,TimeUnit)
timeout()
A second variant of timeout
differs from the first in that instead of issuing an error
notification in case of a timeout condition, it instead immediately switches to a backup Observable that
you specify.
By default this variant of timeout
operates on the computation
Scheduler, but you can choose a different Scheduler by passing it in as
an optional third parameter to timeout
.
timeout(long,TimeUnit,Observable)
timeout(long,TimeUnit,Observable,Scheduler)
A third variant of timeout
does not use a constant timeout duration, but sets its timeout
duration on a per-item basis by passing each item from the source Observable into a function that returns
an Observable and then monitoring those Observables. If any such Observable completes before the source
Observable emits another item, this is considered a timeout condition, and triggers an
onError
notification (“TimeoutException
”) from the Observable
timeout
returns.
This variant of timeout
by default runs on the immediate
Scheduler.
timeout(Func1)
There is also a variant of timeout
that both uses a per-item Observable to set the timeout
duration and switches to a backup Observable in case of a timeout.
This variant of timeout
by default runs on the immediate
Scheduler.
timeout(Func1,Observable)
The variant of timeout
that uses a per-item Observable to set the timeout has a variant
that allows you to pass in a function that returns an Observable that acts as a timeout timer for the
very first item emitted by the source Observable (in the absence of this, there would be no timeout for
the first item).
This variant of timeout
by default runs on the immediate
Scheduler.
timeout(Func0,Func1)
And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.
This variant of timeout
by default runs on the immediate
Scheduler.
timeout(Func0,Func1,Observable)
RxJS implements this operator as timeout
and timeoutWithSelector
:
One variant of timeout
accepts a duration of time (in milliseconds). Each time the source
Observable emits an item, timeout
starts a timer, and if that timer exceeds the duration
before the source Observable emits another item, timeout
terminates its Observable with an
error (“Timeout
” or a string of your choice that you pass as an optional
second parameter).
var source = Rx.Observable .return(42) .delay(5000) .timeout(200, 'Timeout has occurred.'); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Error: Timeout has occurred.
Another variant allows you to instruct timeout
to switch to a backup Observable that you
specify, rather than terminating with an error, if the timeout condition is triggered. To use this
variant, pass the backup Observable (or Promise
) as the second parameter to
timeout
.
var source = Rx.Observable .return(42) .delay(5000) .timeout(200, Promise.resolve(42)); var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Next: 42 Completed
timeoutWithSelector
does not use a constant timeout duration, but sets its timeout duration
on a per-item basis by passing each item from the source Observable into a function that returns
an Observable and then monitoring those Observables. If any such Observable completes before the source
Observable emits another item, this is considered a timeout condition, and triggers an
onError
notification (“Error: Timeout
”) from the Observable
timeoutWithSelector
returns.
var array = [ 200, 300, 350, 400 ]; var source = Rx.Observable .for(array, function (x) { return Rx.Observable.timer(x); }) .map(function (x, i) { return i; }) .timeoutWithSelector(function (x) { return Rx.Observable.timer(400); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Error: Error: Timeout
There is also a variant of timeoutWithSelector
that both uses a per-item Observable to set
the timeout duration and switches to a backup Observable in case of a timeout.
var array = [ 200, 300, 350, 400 ]; var source = Rx.Observable .for(array, function (x) { return Rx.Observable.timer(x); }) .map(function (x, i) { return i; }) .timeoutWithSelector(function (x) { return Rx.Observable.timer(400); }, Rx.Observable.return(42)); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Next: 42 Completed
The variant of timeoutWithSelector
that uses a per-item Observable to set the timeout has a
variant that allows you to pass in an Observable that acts as a timeout timer for the very first item
emitted by the source Observable (in the absence of this, there would be no timeout for the first item;
that is to say, the default Observable that governs this first timeout period is
Rx.Observable.never()
).
var array = [ 200, 300, 350, 400 ]; var source = Rx.Observable .for(array, function (x) { return Rx.Observable.timer(x); }) .map(function (x, i) { return i; }) .timeoutWithSelector(Rx.Observable.timer(250), function (x) { return Rx.Observable.timer(400); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Error: Error: Timeout
And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.
var array = [ 200, 300, 350, 400 ]; var source = Rx.Observable .for(array, function (x) { return Rx.Observable.timer(x); }) .map(function (x, i) { return i; }) .timeoutWithSelector(Rx.Observable.timer(250), function (x) { return Rx.Observable.timer(400); }, Rx.Observable.return(42)); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Next: 42 Completed
timeout
and timeoutWithSelector
are found in each of the following
distributions:
rx.all.js
rx.all.compat.js
rx.time.js
rx.lite.js
rx.lite.compat.js
They require one of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
TBD
TBD
RxPHP implements this operator as timeout
.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/timeout/timeout.php $loop = new \React\EventLoop\StreamSelectLoop(); $scheduler = new \Rx\Scheduler\EventLoopScheduler($loop); Rx\Observable::interval(1000) ->timeout(500) ->subscribe($createStdoutObserver("One second - "), $scheduler); Rx\Observable::interval(100) ->take(3) ->timeout(500) ->subscribe($createStdoutObserver("100 ms - "), $scheduler); $loop->run();
100 ms - Next value: 0 100 ms - Next value: 1 100 ms - Next value: 2 100 ms - Complete! One second - Exception: timeout
TBD
TBD
TBD