Start

create an Observable that emits the return value of a function-like directive

Start

There are a number of ways that programming languages have for obtaining values as the result of calculations, with names like functions, futures, actions, callables, runnables, and so forth. The operators grouped here under the Start operator category make these things behave like Observables so that they can be chained with other Observables in an Observable cascade

See Also

Language-Specific Information:

The various RxGroovy implementations of Start are found in the optional rxjava-async module.

start

The rxjava-async module includes the start operator, which accepts a function as its parameter, calls that function to retrieve a value, and then returns an Observable that will emit that value to each subsequent observer.

Note that the function will only be executed once, even if more than one observer subscribes to the resulting Observable.

toAsync

The rxjava-async module also includes the toAsync, asyncAction, and asyncFunc operators. These accept a function or an Action as their parameter. In the case of a function, this variant of the operator calls that function to retrieve a value, and then returns an Observable that will emit that value to each subsequent observer (just as the start operator does).

In the case of Action, the process is similar, but there is no return value. In this case, the Observable created by this operator will emit a null before terminating.

Note that the function or Action will only be executed once, even if more than one observer subscribes to the resulting Observable.

startFuture

The rxjava-async module also includes the startFuture operator. You pass it a function that returns a Future. startFuture calls this function immediately to obtain the Future, and calls the Future’s get method to try to obtain its value. It returns an Observable to which it will emit this value to any subsequent observers.

deferFuture

The rxjava-async module also includes the deferFuture operator. You pass it a function that returns a Future that returns an Observable. deferFuture returns an Observable, but does not call the function you provide until such time as an observer subscribes to the Observable it returns. When it does so, it immediately calls get on the resulting Future, and then mirrors the emissions from the Observable returned by the Future as its own emissions.

In this way you can include a Future that returns an Observable in a cascade of Observables as a peer to other Observables.

fromAction

The rxjava-async module also includes the fromAction operator. It accepts an Action as its parameter, and returns an Observable that emits the item you pass to fromAction upon termination of the Action

fromCallable

The rxjava-async module also includes the fromCallable operator. It accepts a Callable as its parameter, and returns an Observable that emits the result of this callable as its sole emission.

fromRunnable

The rxjava-async module also includes the fromRunnable operator. It accepts a Runnable as its parameter, and returns an Observable that emits the item you pass to fromRunnable upon termination of the Runnable

forEachFuture

The rxjava-async module also includes the forEachFuture operator. It is not really a variant of the Start operator, but something all its own. You pass forEachFuture some subset of the typical observer methods (onNext, onError, and onCompleted) and the Observable will call these methods in the usual way. But forEachFuture itself returns a Future that blocks on get until the source Observable completes, then returns either the completion or error, depending on how the Observable completed.

You can use this if you need a function that blocks until the completion of an Observable.

The rxjava-async module also includes the runAsync operator. It is peculiar in that it creates a specialization of an Observable called a StoppableObservable.

Pass runAsync an Action and a Scheduler, and it will return a StoppableObservable that uses the specified Action to generate items that it emits. The Action accepts an Observer and a Subscription. It uses the Subscription to check for the unsubscribed condition, upon which it will stop emitting items. You can also manually stop a StoppableObservable at any time by calling its unsubscribe method (which will also unsubscribe the Subscription you have associated with the StoppableObservable).

Because runAsync immediately invokes the Action and begins emitting the items (that is, it produces a hot Observable), it is possible that some items may be lost in the interval between when you establish the StoppableObservable with this operator and when your Observer is ready to receive items. If this is a problem, you can use the variant of runAsync that also accepts a Subject and pass a ReplaySubject with which you can retrieve the otherwise-missing items.

In RxGroovy there is also a version of the From operator that converts a Future into an Observable, and in this way resembles the Start operator.

The various RxJava implementations of Start are found in the optional rxjava-async module.

start

The rxjava-async module includes the start operator, which accepts a function as its parameter, calls that function to retrieve a value, and then returns an Observable that will emit that value to each subsequent observer.

Note that the function will only be executed once, even if more than one observer subscribes to the resulting Observable.

toAsync

The rxjava-async module also includes the toAsync, asyncAction, and asyncFunc operators. These accept a function or an Action as their parameter. In the case of a function, this variant of the operator calls that function to retrieve a value, and then returns an Observable that will emit that value to each subsequent observer (just as the start operator does).

In the case of Action, the process is similar, but there is no return value. In this case, the Observable created by this operator will emit a null before terminating.

Note that the function or Action will only be executed once, even if more than one observer subscribes to the resulting Observable.

startFuture

The rxjava-async module also includes the startFuture operator. You pass it a function that returns a Future. startFuture calls this function immediately to obtain the Future, and calls the Future’s get method to try to obtain its value. It returns an Observable to which it will emit this value to any subsequent observers.

deferFuture

The rxjava-async module also includes the deferFuture operator. You pass it a function that returns a Future that returns an Observable. deferFuture returns an Observable, but does not call the function you provide until such time as an observer subscribes to the Observable it returns. When it does so, it immediately calls get on the resulting Future, and then mirrors the emissions from the Observable returned by the Future as its own emissions.

In this way you can include a Future that returns an Observable in a cascade of Observables as a peer to other Observables.

fromAction

The rxjava-async module also includes the fromAction operator. It accepts an Action as its parameter, and returns an Observable that emits the item you pass to fromAction upon termination of the Action

fromCallable

The rxjava-async module also includes the fromCallable operator. It accepts a Callable as its parameter, and returns an Observable that emits the result of this callable as its sole emission.

fromRunnable

The rxjava-async module also includes the fromRunnable operator. It accepts a Runnable as its parameter, and returns an Observable that emits the item you pass to fromRunnable upon termination of the Runnable

forEachFuture

The rxjava-async module also includes the forEachFuture operator. It is not really a variant of the Start operator, but something all its own. You pass forEachFuture some subset of the typical observer methods (onNext, onError, and onCompleted) and the Observable will call these methods in the usual way. But forEachFuture itself returns a Future that blocks on get until the source Observable completes, then returns either the completion or error, depending on how the Observable completed.

You can use this if you need a function that blocks until the completion of an Observable.

The rxjava-async module also includes the runAsync operator. It is peculiar in that it creates a specialization of an Observable called a StoppableObservable.

Pass runAsync an Action and a Scheduler, and it will return a StoppableObservable that uses the specified Action to generate items that it emits. The Action accepts an Observer and a Subscription. It uses the Subscription to check for the unsubscribed condition, upon which it will stop emitting items. You can also manually stop a StoppableObservable at any time by calling its unsubscribe method (which will also unsubscribe the Subscription you have associated with the StoppableObservable).

Because runAsync immediately invokes the Action and begins emitting the items (that is, it produces a hot Observable), it is possible that some items may be lost in the interval between when you establish the StoppableObservable with this operator and when your Observer is ready to receive items. If this is a problem, you can use the variant of runAsync that also accepts a Subject and pass a ReplaySubject with which you can retrieve the otherwise-missing items.

In RxJava there is also a version of the From operator that converts a Future into an Observable, and in this way resembles the Start operator.

start

RxJS implements the start operator. It takes as its parameters a function whose return value will be the emission from the resulting Observable, and, optionally, any additional parameter to that function and a Scheduler on which to run the function.

Sample Code

var context = { value: 42 };

var source = Rx.Observable.start(
    function () {
        return this.value;
    },
    context,
    Rx.Scheduler.timeout
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 42
Completed

start is found in the following distributions:

  • rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)
  • rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js
start

RxJS also implements the startAsync operator. It takes as its parameters an asynchronous function whose return value will be the emission from the resulting Observable.

You can convert a function into an asynchronous function with the toAsync method. It takes a function, function parameter, and Scheduler as parameters, and returns an asynchronous function that will be invoked on the specified Scheduler. The last two parameters are optional; if you do not specify a Scheduler, the timeout Scheduler will be used by default.

Sample Code

var source = Rx.Observable.startAsync(function () {
    return RSVP.Promise.resolve(42);
});

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 42
Completed

startAsync is found in the following distributions:

  • rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)
  • rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

toAsync is found in the following distributions:

  • rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)
  • rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)

RxPHP implements this operator as start.

Invokes the specified function asynchronously on the specified scheduler, surfacing the result through an observable sequence.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/start/start.php

$source = Rx\Observable::start(function () {
    return 42;
});

$source->subscribe($stdoutObserver);

   
Next value: 42
Complete!
    

TBD