The Subscribe operator is the glue that connects an observer to an Observable. In order for an observer to see the items being emitted by an Observable, or to receive error or completed notifications from the Observable, it must first subscribe to that Observable with this operator.
A typical implementation of the Subscribe operator may accept one to three
methods (which then constitute the observer), or it may accept an object (sometimes called an
Observer
or Subscriber
) that implements the interface which includes those three
methods:
onNext
onError
onNext
or onCompleted
. The onError
method takes as its parameter
an indication of what caused the error (sometimes an object like an Exception or Throwable, other times
a simple string, depending on the implementation).onCompleted
onNext
for the final time, if it has not
encountered any errors.An Observable is called a “cold” Observable if it does not begin to emit items until an observer has subscribed to it; an Observable is called a “hot” Observable if it may begin emitting items at any time, and a subscriber may begin observing the sequence of emitted items at some point after its commencement, missing out on any items emitted previously to the time of the subscription.
TBD
RxGroovy implements several variants of subscribe
.
If you pass it no parameters, it will trigger a subscription to the underlying Observable, but will ignore its emissions and notifications. This will activate a cold Observable.
You can also pass it between one and three functions; these will be interpreted as follows:
onNext
onNext
and onError
onNext
, onError
, and onCompleted
Finally, you can pass it an object that implements either of the Observer
or
Subscriber
interfaces. The Observer
interface consists of the three
previously-described “on
” methods. The Subscriber
interface
implements these also, and adds a number of additional methods that facilitate reactive pull backpressure
and that permit the Subscriber to unsubscribe to an Observable before it completes.
The call to subscribe
returns an object that implements the Subscription
interface. This interface includes the unsubscribe
method that you can call at any time
to sever the subscription that subscribe
established between the Observable and the observer
(or the methods that stand in for the observer).
subscribe()
subscribe(Action1)
subscribe(Action1,Action1)
subscribe(Action1,Action1,Action0)
subscribe(Observer)
subscribe(Subscriber)
The forEach
operators are simpler versions of subscribe
. You can pass them
between one and three functions, which will be interpreted as follows:
onNext
onNext
and onError
onNext
, onError
, and onCompleted
Unlike subscribe
, forEach
does not return an object with which you can cancel
the subscription. Nor do you have the option of passing a parameter that has this capability. So you should
only use this operator if you definitely need to operate on all of the emissions and notifications from the
Observable.
forEach(Action1)
forEach(Action1,Action1)
forEach(Action1,Action1,Action0)
RxJava implements several variants of subscribe
.
If you pass it no parameters, it will trigger a subscription to the underlying Observable, but will ignore its emissions and notifications. This will activate a cold Observable.
You can also pass it between one and three functions; these will be interpreted as follows:
onNext
onNext
and onError
onNext
, onError
, and onCompleted
Finally, you can pass it an object that implements either of the Observer
or
Subscriber
interfaces. The Observer
interface consists of the three
previously-described “on
” methods. The Subscriber
interface
implements these also, and adds a number of additional methods that facilitate reactive pull backpressure
and that permit the Subscriber to unsubscribe to an Observable before it completes.
The call to subscribe
returns an object that implements the Subscription
interface. This interface includes the unsubscribe
method that you can call at any time
to sever the subscription that subscribe
established between the Observable and the observer
(or the methods that stand in for the observer).
subscribe()
subscribe(Action1)
subscribe(Action1,Action1)
subscribe(Action1,Action1,Action0)
subscribe(Observer)
subscribe(Subscriber)
The forEach
operators are simpler versions of subscribe
. You can pass them
between one and three functions, which will be interpreted as follows:
onNext
onNext
and onError
onNext
, onError
, and onCompleted
Unlike subscribe
, forEach
does not return an object with which you can cancel
the subscription. Nor do you have the option of passing a parameter that has this capability. So you should
only use this operator if you definitely need to operate on all of the emissions and notifications from the
Observable.
forEach(Action1)
forEach(Action1,Action1)
forEach(Action1,Action1,Action0)
In RxJS, you can subscribe to an Observable in two ways:
onNext
, the onCompleted
, or
onError
notifications from an Observable, with subscribeOnNext
,
subscribeOnCompleted
, or subscribeOnError
respectivelysubscribe
or forEach
operator (those operators
behave identically).var source = Rx.Observable.range(0, 3) var subscription = source.subscribeOnNext( function (x) { console.log('Next: %s', x); });
Next: 0 Next: 1 Next: 2
var source = Rx.Observable.range(0, 3); var subscription = source.subscribeOnCompleted( function () { console.log('Completed'); });
Completed
var source = Rx.Observable.throw(new Error()); var subscription = source.subscribeOnError( function (err) { console.log('Error: %s', err); });
Error: Error
var observer = Rx.Observer.create( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); }); var source = Rx.Observable.range(0, 3) var subscription = source.subscribe(observer);
Next: 0 Next: 1 Next: 2 Completed
var source = Rx.Observable.range(0, 3) var subscription = source.subscribe( function (x) { console.log('Next: %s', x); }, function (err) { console.log('Error: %s', err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Completed
The functions described in this section are all found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
TBD