Publish

convert an ordinary Observable into a connectable Observable

Publish

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.

See Also

Language-Specific Information:

publish

RxGroovy implements this operator as publish.

publish

There is also a variant that takes a function as a parameter. This function takes an emitted item from the source Observable as a parameter and produces the item that will be emitted in its place by the resulting Observable.

publish

RxJava implements this operator as publish.

publish

There is also a variant that takes a function as a parameter. This function takes as a parameter the ConnectableObservable that shares a single subscription to the underlying Observable sequence. This function produces and returns a new Observable sequence.

publish

In RxJS, the publish operator takes a function as a parameter. This function takes an emitted item from the source Observable as a parameter and produces the item that will be emitted in its place by the returned ConnectableObservable.

Sample Code

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publish();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
publishValue

The publishValue operator takes, in addition to the function described above, an initial item to be emitted by the resulting ConnectableObservable at connection time before emitting the items from the source Observable. It will not, however, emit this initial item to observers that subscribe after the time of connection.

Sample Code

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publishValue(42);

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Next: SourceA42
Next: SourceB42
Side effect
Next: SourceA0
Next: SourceB0
Side effect
Next: SourceA1
Next: SourceB1
Completed
Completed
publishLast

The publishLast operator is similar to publish, and takes a similarly-behaving function as its parameter. It differs from publish in that instead of applying that function to, and emitting an item for every item emitted by the source Observable subsequent to the connection, it only applies that function to and emits an item for the last item that was emitted by the source Observable, when that source Observable terminates normally.

Sample Code

var interval = Rx.Observable.interval(1000);

var source = interval
    .take(2)
    .doAction(function (x) {
        console.log('Side effect');
    });

var published = source.publishLast();

published.subscribe(createObserver('SourceA'));
published.subscribe(createObserver('SourceB'));

var connection = published.connect();

function createObserver(tag) {
    return Rx.Observer.create(
        function (x) { console.log('Next: ' + tag + x); },
        function (err) { console.log('Error: ' + err); },
        function () { console.log('Completed'); });
}
Side effect
Side effect
Next: SourceA1
Completed
Next: SourceB1
Completed

The above operators are available in the following packages:

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (requires either rx.js or rx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

RxJS also has a multicast operator which operates on an ordinary Observable, multicasts that Observable by means of a particular Subject that you specify, applies a transformative function to each emission, and then emits those transformed values as its own ordinary Observable sequence. Each subscription to this new Observable will trigger a new subscription to the underlying multicast Observable.

Sample Code

var subject = new Rx.Subject();
var source = Rx.Observable.range(0, 3)
    .multicast(subject);

var observer = Rx.Observer.create(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); }
);

var subscription = source.subscribe(observer);
subject.subscribe(observer);

var connected = source.connect();

subscription.dispose();
Next: 0
Next: 0
Next: 1
Next: 1
Next: 2
Next: 2
Completed

The multicast operator is available in the following packages:

  • rx.all.js
  • rx.all.compat.js
  • rx.binding.js (requires either rx.lite.js or rx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

There is also a let operator (the alias letBind is available for browsers such as Internet Explorer before IE9 where “let” is forbidden). It is similar to multicast but does not multicast the underlying Observable through a Subject:

Sample Code

var obs = Rx.Observable.range(1, 3);

var source = obs.let(function (o) { return o.concat(o); });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 1
Next: 2
Next: 3
Next: 1
Next: 2
Next: 3
Completed

The let (or letBind) operator is available in the following packages:

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js

It requires one of the following packages:

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP implements this operator as multicast.

Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/multicast/multicast.php

$subject = new \Rx\Subject\Subject();
$source  = \Rx\Observable::range(0, 3)->multicast($subject);

$subscription = $source->subscribe($stdoutObserver);
$subject->subscribe($stdoutObserver);

$connected = $source->connect();

   
Next value: 0
Next value: 0
Next value: 1
Next value: 1
Next value: 2
Next value: 2
Complete!
    

RxPHP also has an operator multicastWithSelector.

Multicasts the source sequence notifications through an instantiated subject from a subject selector factory, into all uses of the sequence within a selector function. Each subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.

RxPHP also has an operator publish.

Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence. This operator is a specialization of Multicast using a regular Subject.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publish.php

/* With publish */
$interval = \Rx\Observable::range(0, 10);

$source = $interval
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publish();

$published->subscribe($createStdoutObserver('SourceC '));
$published->subscribe($createStdoutObserver('SourceD '));

$published->connect();

   
Side effect
SourceC Next value: 0
SourceD Next value: 0
Side effect
SourceC Next value: 1
SourceD Next value: 1
SourceC Complete!
SourceD Complete!
    

RxPHP also has an operator publishLast.

Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence containing only the last notification. This operator is a specialization of Multicast using a AsyncSubject.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishLast.php

$range = \Rx\Observable::fromArray(range(0, 1000));

$source = $range
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publishLast();

$published->subscribe($createStdoutObserver('SourceA'));
$published->subscribe($createStdoutObserver('SourceB'));

$connection = $published->connect();

   
Side effect
Side effect
SourceANext value: 1
SourceBNext value: 1
SourceAComplete!
SourceBComplete!
    

RxPHP also has an operator publishValue.

Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. This operator is a specialization of Multicast using a BehaviorSubject.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishValue.php

$range = \Rx\Observable::fromArray(range(0, 1000));

$source = $range
    ->take(2)
    ->doOnNext(function ($x) {
        echo "Side effect\n";
    });

$published = $source->publishValue(42);

$published->subscribe($createStdoutObserver('SourceA'));
$published->subscribe($createStdoutObserver('SourceB'));

$connection = $published->connect();

   
SourceANext value: 42
SourceBNext value: 42
Side effect
SourceANext value: 0
SourceBNext value: 0
Side effect
SourceANext value: 1
SourceBNext value: 1
SourceAComplete!
SourceBComplete!
    

TBD