There are a number of ways that programming languages have for obtaining values as the result of calculations, with names like functions, futures, actions, callables, runnables, and so forth. The operators grouped here under the Start operator category make these things behave like Observables so that they can be chained with other Observables in an Observable cascade
TBD
TBD
The various RxGroovy implementations of Start are found in the
optional rxjava-async
module.
The rxjava-async
module includes the start
operator, which accepts
a function as its parameter, calls that function to retrieve a value, and then returns an
Observable that will emit that value to each subsequent observer.
Note that the function will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async
module also includes the toAsync
,
asyncAction
, and asyncFunc
operators. These accept a function or
an Action as their parameter. In the case of a function, this variant of the operator calls
that function to retrieve a value, and then returns an Observable that will emit that value
to each subsequent observer (just as the start
operator does).
In the case of Action, the process is similar, but there is no return value. In this case,
the Observable created by this operator will emit a null
before terminating.
Note that the function or Action will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async
module also includes the startFuture
operator.
You pass it a function that returns a Future
. startFuture
calls this
function immediately to obtain the Future
, and calls the
Future
’s get
method to try to obtain its value. It returns an
Observable to which it will emit this value to any subsequent observers.
The rxjava-async
module also includes the deferFuture
operator.
You pass it a function that returns a Future
that returns an Observable.
deferFuture
returns an Observable, but does not call the function you provide
until such time as an observer subscribes to the Observable it returns. When it does so, it
immediately calls get
on the resulting Future
, and then mirrors the
emissions from the Observable returned by the Future
as its own emissions.
In this way you can include a Future
that returns an Observable in a cascade of
Observables as a peer to other Observables.
The rxjava-async
module also includes the fromAction
operator.
It accepts an Action
as its parameter, and returns an Observable that emits the
item you pass to fromAction
upon termination of the Action
The rxjava-async
module also includes the fromCallable
operator.
It accepts a Callable
as its parameter, and returns an Observable that emits the
result of this callable as its sole emission.
The rxjava-async
module also includes the fromRunnable
operator.
It accepts a Runnable
as its parameter, and returns an Observable that emits the
item you pass to fromRunnable
upon termination of the Runnable
The rxjava-async
module also includes the forEachFuture
operator.
It is not really a variant of the Start operator, but something
all its own. You pass forEachFuture
some subset of the typical observer methods
(onNext
, onError
, and onCompleted
) and the Observable
will call these methods in the usual way. But forEachFuture
itself returns a
Future
that blocks on get
until the source Observable completes,
then returns either the completion or error, depending on how the Observable completed.
You can use this if you need a function that blocks until the completion of an Observable.
The rxjava-async
module also includes the runAsync
operator. It is
peculiar in that it creates a specialization of an Observable called a
StoppableObservable
.
Pass runAsync
an Action
and a
Scheduler
, and it will
return a StoppableObservable
that uses the specified Action
to
generate items that it emits. The Action
accepts an Observer
and a
Subscription
. It uses the Subscription
to check for the
unsubscribed
condition, upon which it will stop emitting items. You can also
manually stop a StoppableObservable
at any time by calling its
unsubscribe
method (which will also unsubscribe the Subscription
you
have associated with the StoppableObservable
).
Because runAsync
immediately invokes the Action
and begins emitting
the items (that is, it produces a hot Observable), it is possible that some items may
be lost in the interval between when you establish the StoppableObservable
with
this operator and when your Observer
is ready to receive items. If this is a
problem, you can use the variant of runAsync
that also accepts a
Subject
and pass a ReplaySubject
with which you can retrieve the
otherwise-missing items.
In RxGroovy there is also a version of the
From operator that converts a
Future
into an Observable, and in this way resembles the
Start operator.
The various RxJava implementations of Start are found in the
optional rxjava-async
module.
The rxjava-async
module includes the start
operator, which accepts
a function as its parameter, calls that function to retrieve a value, and then returns an
Observable that will emit that value to each subsequent observer.
Note that the function will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async
module also includes the toAsync
,
asyncAction
, and asyncFunc
operators. These accept a function or
an Action as their parameter. In the case of a function, this variant of the operator calls
that function to retrieve a value, and then returns an Observable that will emit that value
to each subsequent observer (just as the start
operator does).
In the case of Action, the process is similar, but there is no return value. In this case,
the Observable created by this operator will emit a null
before terminating.
Note that the function or Action will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async
module also includes the startFuture
operator.
You pass it a function that returns a Future
. startFuture
calls this
function immediately to obtain the Future
, and calls the
Future
’s get
method to try to obtain its value. It returns an
Observable to which it will emit this value to any subsequent observers.
The rxjava-async
module also includes the deferFuture
operator.
You pass it a function that returns a Future
that returns an Observable.
deferFuture
returns an Observable, but does not call the function you provide
until such time as an observer subscribes to the Observable it returns. When it does so, it
immediately calls get
on the resulting Future
, and then mirrors the
emissions from the Observable returned by the Future
as its own emissions.
In this way you can include a Future
that returns an Observable in a cascade of
Observables as a peer to other Observables.
The rxjava-async
module also includes the fromAction
operator.
It accepts an Action
as its parameter, and returns an Observable that emits the
item you pass to fromAction
upon termination of the Action
The rxjava-async
module also includes the fromCallable
operator.
It accepts a Callable
as its parameter, and returns an Observable that emits the
result of this callable as its sole emission.
The rxjava-async
module also includes the fromRunnable
operator.
It accepts a Runnable
as its parameter, and returns an Observable that emits the
item you pass to fromRunnable
upon termination of the Runnable
The rxjava-async
module also includes the forEachFuture
operator.
It is not really a variant of the Start operator, but something
all its own. You pass forEachFuture
some subset of the typical observer methods
(onNext
, onError
, and onCompleted
) and the Observable
will call these methods in the usual way. But forEachFuture
itself returns a
Future
that blocks on get
until the source Observable completes,
then returns either the completion or error, depending on how the Observable completed.
You can use this if you need a function that blocks until the completion of an Observable.
The rxjava-async
module also includes the runAsync
operator. It is
peculiar in that it creates a specialization of an Observable called a
StoppableObservable
.
Pass runAsync
an Action
and a
Scheduler
, and it will
return a StoppableObservable
that uses the specified Action
to
generate items that it emits. The Action
accepts an Observer
and a
Subscription
. It uses the Subscription
to check for the
unsubscribed
condition, upon which it will stop emitting items. You can also
manually stop a StoppableObservable
at any time by calling its
unsubscribe
method (which will also unsubscribe the Subscription
you
have associated with the StoppableObservable
).
Because runAsync
immediately invokes the Action
and begins emitting
the items (that is, it produces a hot Observable), it is possible that some items may
be lost in the interval between when you establish the StoppableObservable
with
this operator and when your Observer
is ready to receive items. If this is a
problem, you can use the variant of runAsync
that also accepts a
Subject
and pass a ReplaySubject
with which you can retrieve the
otherwise-missing items.
In RxJava there is also a version of the
From operator that converts a
Future
into an Observable, and in this way resembles the
Start operator.
RxJS implements the start
operator. It takes as its parameters a function whose
return value will be the emission from the resulting Observable, and, optionally, any
additional parameter to that function and a Scheduler on which to run the
function.
var context = { value: 42 }; var source = Rx.Observable.start( function () { return this.value; }, context, Rx.Scheduler.timeout ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
start
is found in the following distributions:
rx.async.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)rx.async.compat.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxJS also implements the startAsync
operator. It takes as its parameters an
asynchronous function whose return value will be the emission from the resulting Observable.
You can convert a function into an asynchronous function with the toAsync
method. It takes a function, function parameter, and Scheduler as
parameters, and returns an asynchronous function that will be invoked on the specified Scheduler. The
last two parameters are optional; if you do not specify a Scheduler, the timeout
Scheduler
will be used by default.
var source = Rx.Observable.startAsync(function () { return RSVP.Promise.resolve(42); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 42 Completed
startAsync
is found in the following distributions:
rx.async.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)rx.async.compat.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
toAsync
is found in the following distributions:
rx.async.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)rx.async.compat.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)
RxPHP implements this operator as start
.
Invokes the specified function asynchronously on the specified scheduler, surfacing the result through an observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/start/start.php $source = Rx\Observable::start(function () { return 42; }); $source->subscribe($stdoutObserver);
Next value: 42 Complete!
TBD
TBD