Replay

ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items

Replay

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.

If you apply the Replay operator to an Observable before you convert it into a connectable Observable, the resulting connectable Observable will always emit the same complete sequence to any future observers, even those observers that subscribe after the connectable Observable has begun to emit items to other subscribed observers.

See Also

Language-Specific Information:

replay

In RxGroovy there is a variety of the replay operator that returns a connectable Observable. You must Publish this connectable Observable before observers can subscribe to it, and then Connect to it in order to observe its emissions.

Variants of this variety of the replay operator permit you to set a maximum buffer size to limit the number of items replay will buffer and replay to subsequent observers, and/or to establish a moving time window that defines when emitted items become too old to buffer and replay.

replay

There is also a variety of replay that returns an ordinary Observable. These variants take as a parameter a transformative function; this function accepts an item emitted by the source Observable as its parameter, and returns an item to be emitted by the resulting Observable. So really, this operator does not replay the source Observable but instead replays the source Observable as transformed by this function.

Variants of this variety of the replay operator permit you to set a maximum buffer size to limit the number of items replay will buffer and replay to subsequent observers, and/or to establish a moving time window that defines when emitted items become too old to buffer and replay.

replay

In RxJava there is a variety of the replay operator that returns a connectable Observable. You must Publish this connectable Observable before observers can subscribe to it, and then Connect to it in order to observe its emissions.

Variants of this variety of the replay operator permit you to set a maximum buffer size to limit the number of items replay will buffer and replay to subsequent observers, and/or to establish a moving time window that defines when emitted items become too old to buffer and replay.

replay

There is also a variety of replay that returns an ordinary Observable. These variants take as a parameter a transformative function; this function accepts an item emitted by the source Observable as its parameter, and returns an item to be emitted by the resulting Observable. So really, this operator does not replay the source Observable but instead replays the source Observable as transformed by this function.

Variants of this variety of the replay operator permit you to set a maximum buffer size to limit the number of items replay will buffer and replay to subsequent observers, and/or to establish a moving time window that defines when emitted items become too old to buffer and replay.

replay

In RxJs the replay operator takes four optional parameters and returns an ordinary Observable:

selector
a transforming function that takes an item emitted by the source Observable as its parameter and returns an item to be emitted by the resulting Observable
bufferSize
the maximum number of items to buffer and replay to subsequent observers
window
the age, in milliseconds, at which items in this buffer may be discarded without being emitted to subsequent observers
scheduler
the Scheduler on which this operator will operate

Sample Code

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .do(function (x) {
        console.log('Side effect');
    });

var published = source
    .replay(function (x) {
        return x.take(2).repeat(2);
    }, 3);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Side effect
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceA0
Next: SourceA1
Completed
Side effect
Next: SourceB1
Next: SourceB0
Next: SourceB1
Completed

There is also a shareReplay operator, which keeps track of the number of observers, and disconnects from the source Observable when that number drops to zero. shareReplay takes three optional parameters and returns an ordinary Observable:

bufferSize
the maximum number of items to buffer and replay to subsequent observers
window
the age, in milliseconds, at which items in this buffer may be discarded without being emitted to subsequent observers
scheduler
the Scheduler on which this operator will operate

Sample Code

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(4)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source
    .shareReplay(3);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

// Creating a third subscription after the previous two subscriptions have
// completed. Notice that no side effects result from this subscription,
// because the notifications are cached and replayed.
Rx.Observable
    .return(true)
    .delay(6000)
    .flatMap(published)
    .subscribe(createObserver('SourceC'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Side effect
Next: SourceA2
Next: SourceB2
Side effect
Next: SourceA3
Next: SourceB3
Completed
Completed
Next: SourceC1
Next: SourceC2
Next: SourceC3
Completed

replay and shareReplay are found in the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (requires rx.js or rx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

RxPHP implements this operator as replay.

Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer. This operator is a specialization of Multicast using a ReplaySubject.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/replay/replay.php

$loop      = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$interval = \Rx\Observable::interval(1000);

$source = $interval
    ->take(2)
    ->doOnNext(function ($x) {
        echo $x, " something", PHP_EOL;
        echo "Side effect", PHP_EOL;
    });

$published = $source
    ->replay(function (\Rx\Observable $x) {
        return $x->take(2)->repeat(2);
    }, 3);

$published->subscribe($createStdoutObserver('SourceA '), $scheduler);
$published->subscribe($createStdoutObserver('SourceB '), $scheduler);

$loop->run();

   

    

RxPHP also has an operator shareReplay.

Returns an observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer. This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareReplay.php

$loop      = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$interval = Rx\Observable::interval(1000);

$source = $interval
    ->take(4)
    ->doOnNext(function ($x) {
        echo 'Side effect', PHP_EOL;
    });

$published = $source
    ->shareReplay(3);

$published->subscribe($createStdoutObserver('SourceA '), $scheduler);
$published->subscribe($createStdoutObserver('SourceB '), $scheduler);

Rx\Observable
    ::just(true)
    ->concatMapTo(\Rx\Observable::timer(6000))
    ->flatMap(function () use ($published) {
        return $published;
    })
    ->subscribe($createStdoutObserver('SourceC '), $scheduler);

$loop->run();


   
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
Side effect
SourceA Next value: 2
SourceB Next value: 2
Side effect
SourceA Next value: 3
SourceB Next value: 3
SourceA Complete!
SourceB Complete!
SourceC Next value: 1
SourceC Next value: 2
SourceC Next value: 3
SourceC Complete!
    

TBD