There are a number of ways that programming languages have for obtaining values as the result of calculations, with names like functions, futures, actions, callables, runnables, and so forth. The operators grouped here under the Start operator category make these things behave like Observables so that they can be chained with other Observables in an Observable cascade
TBD
TBD
The various RxGroovy implementations of Start are found in the
optional rxjava-async module.
The rxjava-async module includes the start operator, which accepts
a function as its parameter, calls that function to retrieve a value, and then returns an
Observable that will emit that value to each subsequent observer.
Note that the function will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async module also includes the toAsync,
asyncAction, and asyncFunc operators. These accept a function or
an Action as their parameter. In the case of a function, this variant of the operator calls
that function to retrieve a value, and then returns an Observable that will emit that value
to each subsequent observer (just as the start operator does).
In the case of Action, the process is similar, but there is no return value. In this case,
the Observable created by this operator will emit a null before terminating.
Note that the function or Action will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async module also includes the startFuture operator.
You pass it a function that returns a Future. startFuture calls this
function immediately to obtain the Future, and calls the
Future’s get method to try to obtain its value. It returns an
Observable to which it will emit this value to any subsequent observers.
The rxjava-async module also includes the deferFuture operator.
You pass it a function that returns a Future that returns an Observable.
deferFuture returns an Observable, but does not call the function you provide
until such time as an observer subscribes to the Observable it returns. When it does so, it
immediately calls get on the resulting Future, and then mirrors the
emissions from the Observable returned by the Future as its own emissions.
In this way you can include a Future that returns an Observable in a cascade of
Observables as a peer to other Observables.
The rxjava-async module also includes the fromAction operator.
It accepts an Action as its parameter, and returns an Observable that emits the
item you pass to fromAction upon termination of the Action
The rxjava-async module also includes the fromCallable operator.
It accepts a Callable as its parameter, and returns an Observable that emits the
result of this callable as its sole emission.
The rxjava-async module also includes the fromRunnable operator.
It accepts a Runnable as its parameter, and returns an Observable that emits the
item you pass to fromRunnable upon termination of the Runnable
The rxjava-async module also includes the forEachFuture operator.
It is not really a variant of the Start operator, but something
all its own. You pass forEachFuture some subset of the typical observer methods
(onNext, onError, and onCompleted) and the Observable
will call these methods in the usual way. But forEachFuture itself returns a
Future that blocks on get until the source Observable completes,
then returns either the completion or error, depending on how the Observable completed.
You can use this if you need a function that blocks until the completion of an Observable.
The rxjava-async module also includes the runAsync operator. It is
peculiar in that it creates a specialization of an Observable called a
StoppableObservable.
Pass runAsync an Action and a
Scheduler, and it will
return a StoppableObservable that uses the specified Action to
generate items that it emits. The Action accepts an Observer and a
Subscription. It uses the Subscription to check for the
unsubscribed condition, upon which it will stop emitting items. You can also
manually stop a StoppableObservable at any time by calling its
unsubscribe method (which will also unsubscribe the Subscription you
have associated with the StoppableObservable).
Because runAsync immediately invokes the Action and begins emitting
the items (that is, it produces a hot Observable), it is possible that some items may
be lost in the interval between when you establish the StoppableObservable with
this operator and when your Observer is ready to receive items. If this is a
problem, you can use the variant of runAsync that also accepts a
Subject and pass a ReplaySubject with which you can retrieve the
otherwise-missing items.
In RxGroovy there is also a version of the
From operator that converts a
Future into an Observable, and in this way resembles the
Start operator.
The various RxJava implementations of Start are found in the
optional rxjava-async module.
The rxjava-async module includes the start operator, which accepts
a function as its parameter, calls that function to retrieve a value, and then returns an
Observable that will emit that value to each subsequent observer.
Note that the function will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async module also includes the toAsync,
asyncAction, and asyncFunc operators. These accept a function or
an Action as their parameter. In the case of a function, this variant of the operator calls
that function to retrieve a value, and then returns an Observable that will emit that value
to each subsequent observer (just as the start operator does).
In the case of Action, the process is similar, but there is no return value. In this case,
the Observable created by this operator will emit a null before terminating.
Note that the function or Action will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async module also includes the startFuture operator.
You pass it a function that returns a Future. startFuture calls this
function immediately to obtain the Future, and calls the
Future’s get method to try to obtain its value. It returns an
Observable to which it will emit this value to any subsequent observers.
The rxjava-async module also includes the deferFuture operator.
You pass it a function that returns a Future that returns an Observable.
deferFuture returns an Observable, but does not call the function you provide
until such time as an observer subscribes to the Observable it returns. When it does so, it
immediately calls get on the resulting Future, and then mirrors the
emissions from the Observable returned by the Future as its own emissions.
In this way you can include a Future that returns an Observable in a cascade of
Observables as a peer to other Observables.
The rxjava-async module also includes the fromAction operator.
It accepts an Action as its parameter, and returns an Observable that emits the
item you pass to fromAction upon termination of the Action
The rxjava-async module also includes the fromCallable operator.
It accepts a Callable as its parameter, and returns an Observable that emits the
result of this callable as its sole emission.
The rxjava-async module also includes the fromRunnable operator.
It accepts a Runnable as its parameter, and returns an Observable that emits the
item you pass to fromRunnable upon termination of the Runnable
The rxjava-async module also includes the forEachFuture operator.
It is not really a variant of the Start operator, but something
all its own. You pass forEachFuture some subset of the typical observer methods
(onNext, onError, and onCompleted) and the Observable
will call these methods in the usual way. But forEachFuture itself returns a
Future that blocks on get until the source Observable completes,
then returns either the completion or error, depending on how the Observable completed.
You can use this if you need a function that blocks until the completion of an Observable.
The rxjava-async module also includes the runAsync operator. It is
peculiar in that it creates a specialization of an Observable called a
StoppableObservable.
Pass runAsync an Action and a
Scheduler, and it will
return a StoppableObservable that uses the specified Action to
generate items that it emits. The Action accepts an Observer and a
Subscription. It uses the Subscription to check for the
unsubscribed condition, upon which it will stop emitting items. You can also
manually stop a StoppableObservable at any time by calling its
unsubscribe method (which will also unsubscribe the Subscription you
have associated with the StoppableObservable).
Because runAsync immediately invokes the Action and begins emitting
the items (that is, it produces a hot Observable), it is possible that some items may
be lost in the interval between when you establish the StoppableObservable with
this operator and when your Observer is ready to receive items. If this is a
problem, you can use the variant of runAsync that also accepts a
Subject and pass a ReplaySubject with which you can retrieve the
otherwise-missing items.
In RxJava there is also a version of the
From operator that converts a
Future into an Observable, and in this way resembles the
Start operator.
RxJS implements the start operator. It takes as its parameters a function whose
return value will be the emission from the resulting Observable, and, optionally, any
additional parameter to that function and a Scheduler on which to run the
function.
var context = { value: 42 };
var source = Rx.Observable.start(
function () {
return this.value;
},
context,
Rx.Scheduler.timeout
);
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});Next: 42 Completed
start is found in the following distributions:
rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.lite.jsrx.lite.compat.js
RxJS also implements the startAsync operator. It takes as its parameters an
asynchronous function whose return value will be the emission from the resulting Observable.
You can convert a function into an asynchronous function with the toAsync
method. It takes a function, function parameter, and Scheduler as
parameters, and returns an asynchronous function that will be invoked on the specified Scheduler. The
last two parameters are optional; if you do not specify a Scheduler, the timeout Scheduler
will be used by default.
var source = Rx.Observable.startAsync(function () {
return RSVP.Promise.resolve(42);
});
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});Next: 42 Completed
startAsync is found in the following distributions:
rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.lite.jsrx.lite.compat.js
toAsync is found in the following distributions:
rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)
RxPHP implements this operator as start.
Invokes the specified function asynchronously on the specified scheduler, surfacing the result through an observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/start/start.php
$source = Rx\Observable::start(function () {
return 42;
});
$source->subscribe($stdoutObserver);
Next value: 42
Complete!
TBD
TBD