RefCount

make a Connectable Observable behave like an ordinary Observable

RefCount

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.

The RefCount operator automates the process of connecting to and disconnecting from a connectable Observable. It operates on a connectable Observable and returns an ordinary Observable. When the first observer subscribes to this Observable, RefCount connects to the underlying connectable Observable. RefCount then keeps track of how many other observers subscribe to it and does not disconnect from the underlying connectable Observable until the last observer has done so.

See Also

Language-Specific Information:

refCount

RxGroovy implements this operator as refCount.

There is also a share operator, which is the equivalent of applying both the publish and refCount operators to an Observable, in that order.

refCount

RxJava implements this operator as refCount.

There is also a share operator, which is the equivalent of applying both the publish and refCount operators to an Observable, in that order.

refCount

RxJava implements this operator as refCount.

Sample Code

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) { console.log('Side effect'); });

var published = source.publish().refCount();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed

refCount is found in the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (requires rx.js, rx.compat.js, rx.lite.js, or rx.lite.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

There is also a share operator, which is the equivalent of applying both the publish and refCount operators to an Observable, in that order. A variant called shareValue takes as a parameter a single item that it will emit to any subscribers before beginning to emit items from the source Observable.

Sample Code

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .do(
        function (x) { console.log('Side effect'); });

var published = source.share();

// When the number of observers subscribed to published observable goes from
// 0 to 1, we connect to the underlying observable sequence.
published.subscribe(createObserver('SourceA'));
// When the second subscriber is added, no additional subscriptions are added to the
// underlying observable sequence. As a result the operations that result in side
// effects are not repeated per subscriber.
published.subscribe(createObserver('SourceB'));

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed

share and shareValue are found in the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (requires rx.js or rx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

RxPHP implements this operator as share.

Returns an observable sequence that shares a single subscription to the underlying sequence. This operator is a specialization of publish which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/share.php

$loop = \React\EventLoop\Factory::create();
$scheduler  = new \Rx\Scheduler\EventLoopScheduler($loop);

//With Share
$source = \Rx\Observable::interval(1000, $scheduler)
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->share();

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

$loop->run();


   
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
    

RxPHP also has an operator shareValue.

Returns an observable sequence that shares a single subscription to the underlying sequence and starts with an initialValue. This operator is a specialization of publishValue which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/share/shareValue.php

$loop = \React\EventLoop\Factory::create();
$scheduler  = new \Rx\Scheduler\EventLoopScheduler($loop);

$source = \Rx\Observable::interval(1000, $scheduler)
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->shareValue(42);

$published->subscribe($createStdoutObserver('SourceA '));
$published->subscribe($createStdoutObserver('SourceB '));

$loop->run();

   
SourceA Next value: 42
SourceB Next value: 42
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
    

TBD