Delay

shift the emissions from an Observable forward in time by a particular amount

The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.

See Also

Language-Specific Information:

RxGroovy implements this operator as variants of delay and delaySubscription.

delay

The first variant of delay accepts parameters that define a duration of time (a quantity of time, and a TimeUnit that this quantity is denominated in). Each time the source Observable emits an item, delay starts a timer, and when that timer reaches the given duration, the Observable returned from delay emits the same item.

Note that delay will not time-shift an onError notification in this fashion but it will forward such a notification immediately to its subscribers while dropping any pending onNext notifications. It will however time shift an onCompleted notification.

By default this variant of delay operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to delay

delay

Another variant of delay does not use a constant delay duration, but sets its delay duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. When any such Observable emits a item or completes, the Observable returned by delay emits the associated item.

This variant of delay does not by default run on any particular Scheduler.

delay

The variant of delay that uses a per-item Observable to set the delay has a variant that allows you to pass in a function that returns an Observable that acts as a delay timer for the subscription to the source Observable (in the absence of this, delay subscribes to the source Observable as soon as an observer subscribes to the Observable returned by delay).

This variant of delay does not by default run on any particular Scheduler.

delaySubscription

There is also an operator with which you can delay the subscription to the source Observable: delaySubscription. It accepts parameters that define the amount of time to delay (a quantity of time, and a TimeUnit that this quantity is denominated in).

This variant of delay by default runs on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to delaySubscription.

delaySubscription

And there is a variant of delaySubscription that uses an Observable (returned by a function you supply) rather than a fixed duration in order to set the subscription delay.

This variant of delaySubscription does not by default run on any particular Scheduler.

RxJava implements this operator as variants of delay and delaySubscription.

delay

The first variant of delay accepts parameters that define a duration of time (a quantity of time, and a TimeUnit that this quantity is denominated in). Each time the source Observable emits an item, delay starts a timer, and when that timer reaches the given duration, the Observable returned from delay emits the same item.

Note that delay will not time-shift an onError notification in this fashion but it will forward such a notification immediately to its subscribers while dropping any pending onNext notifications. It will however time shift an onCompleted notification.

By default this variant of delay operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to delay

delay

Another variant of delay does not use a constant delay duration, but sets its delay duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. When any such Observable emits an item or completes, the Observable returned by delay emits the associated item.

This variant of delay does not by default run on any particular Scheduler.

delay

The variant of delay that uses a per-item Observable to set the delay has a variant that allows you to pass in a function that returns an Observable that acts as a delay timer for the subscription to the source Observable (in the absence of this, delay subscribes to the source Observable as soon as an observer subscribes to the Observable returned by delay).

This variant of delay does not by default run on any particular Scheduler.

delaySubscription

There is also an operator with which you can delay the subscription to the source Observable: delaySubscription. It accepts parameters that define the amount of time to delay (a quantity of time, and a TimeUnit that this quantity is denominated in).

This variant of delay by default runs on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to delaySubscription.

delaySubscription

And there is a variant of delaySubscription that uses an Observable (returned by a function you supply) rather than a fixed duration in order to set the subscription delay.

This variant of delaySubscription does not by default run on any particular Scheduler.

delay

In RxJS you can set the per-item delay in two ways: by passing a number of milliseconds into the delay operator (which will delay each emission by that amount of time), or by passing in a Date object (which will delay the beginning of the sequence of emissions until that absolute point in time).

This operator operates by default on the timeout Scheduler, but you can override this by passing in another Scheduler as an optional second parameter.

Sample Code

var source = Rx.Observable.range(0, 3)
    .delay(new Date(Date.now() + 1000));

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); }); 
Next: 0
Next: 1
Next: 2
Completed
var source = Rx.Observable.range(0, 3)
    .delay(1000);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x.toString()); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed
delaySubscription

delaySubscription is similar to delay but rather than timeshifting the emissions from the source Observable, it timeshifts the moment of subscription to that Observable. You pass to this operator a time value (either a Number, in which case this sets the number of milliseconds of delay, or a Date, in which case this sets an absolute future time at which delaySubscription will trigger the subscription). You may optionally pass a Scheduler as a second parameter, which delaySubscription will use to govern the delay period or trigger time.

Sample Code

var start = Date.now();
var source = Rx.Observable.range(0, 3).delaySubscription(5000);

var subscription = source.subscribe(
    function (x) { console.log('Next: %s, %s', x, Date.now() - start); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0, 5001
Next: 1, 5002
Next: 2, 5003
Completed
delayWithSelector

delayWithSelector is like delay but does not use a constant delay duration (or absolute time), but sets its delay duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. When any such Observable completes, the Observable returned by delay emits the associated item.

Sample Code

var source = Rx.Observable
    .range(0, 3)
    .delayWithSelector(
        function (x) {
            return Rx.Observable.timer(x * 400);
        })
    .timeInterval()
    .map(function (x) { return x.value + ':' + x.interval; });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0:0
Next: 1:400
Next: 2:400
Completed
delayWithSelector

There is also a version of delayWithSelector that takes an additional (first) argument: an Observable that sets a delay before delayWithSelector subscribes to the source Observable after it itself is subscribed to.

Sample Code

var source = Rx.Observable
    .range(0, 3)
    .delayWithSelector(
        Rx.Observable.timer(300),
        function (x) {
            return Rx.Observable.timer(x * 400);
        }
    )
    .timeInterval()
    .map(function (x) { return x.value + ':' + x.interval; });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 0:300
Next: 1:400
Next: 2:400
Completed

delay, delaySubscription, and delayWithSelector require rx.lite.js or rx.lite.compat.js and are found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.time.js (requires rx.js or rx.compat.js)

RxPHP implements this operator as delay.

Time shifts the observable sequence by dueTime. The relative time intervals between the values are preserved.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/delay/delay.php

$loop = new \React\EventLoop\StreamSelectLoop();

$scheduler  = new \Rx\Scheduler\EventLoopScheduler($loop);

\Rx\Observable::interval(1000, $scheduler)
    ->doOnNext(function ($x) {
        echo "Side effect: " . $x . "\n";
    })
    ->delay(500)
    ->take(5)
    ->subscribe($createStdoutObserver(), $scheduler);

$loop->run();

   
Side effect: 0
Next value: 0
Side effect: 1
Next value: 1
Side effect: 2
Next value: 2
Side effect: 3
Next value: 3
Side effect: 4
Next value: 4
Complete!
    

TBD