When you work with Observables, it can be more convenient if all of the data you mean to work with can be represented as Observables, rather than as a mixture of Observables and other types. This allows you to use a single set of operators to govern the entire lifespan of the data stream.
Iterables, for example, can be thought of as a sort of synchronous Observable; Futures, as a sort of Observable that always emits only a single item. By explicitly converting such objects to Observables, you allow them to interact as peers with other Observables.
For this reason, most ReactiveX implementations have methods that allow you to convert certain language-specific objects and data structures into Observables.
TBD
TBD
In RxGroovy, the from operator can convert a Future, an Iterable, or an Array.
In the case of an Iterable or an Array, the resulting Observable will emit each item contained
in the Iterable or Array.
In the case of a Future, it will emit the single result of the get call. You may
optionally pass the version of from that accepts a future two additional
parameters indicating a timeout span and the units of time that span is denominated in. The
resulting Observable will terminate with an error if that span of time passes before the
Future responds with a value.
from does not by default operate on any particular Scheduler,
however you can pass the variant that converts a Future a Scheduler as an optional second parameter, and
it will use that Scheduler to govern the Future.
from(array)from(Iterable)from(Future)from(Future,Scheduler)from(Future,timout,timeUnit)
In addition, in the RxJavaAsyncUtil package, you have available to you the
following operators that convert actions, callables, functions, and runnables into
Observables that emit the results of those things:
fromActionfromCallablefromFunc0fromRunnableSee the Start operator for more information about those operators.
Note that there is also a from operator that is a method of the optional
StringObservable class. It converts a stream of characters or a
Reader into an Observable that emits byte arrays or Strings.
In the separate RxJavaAsyncUtil package, which is not included by default with RxGroovy, there
is also a runAsync function. Pass runAsync an Action and a
Scheduler, and it will return a
StoppableObservable that uses the specified Action to generate items that it
emits.
The Action accepts an Observer and a Subscription. It uses the
Subscription to check for the isUnsubscribed condition, upon which it will stop
emitting items. You can also manually stop a StoppableObservable at any time by calling its
unsubscribe method (which will also unsubscribe the Subscription you have
associated with the StoppableObservable).
Because runAsync immediately invokes the Action and begins emitting the items, it
is possible that some items may be lost in the interval between when you establish the
StoppableObservable with this method and when your Observer is ready to receive
items. If this is a problem, you can use the variant of runAsync that also accepts a
Subject and pass a ReplaySubject with which you can
retrieve the otherwise-missing items.
The StringObservable class, which is not a default part of RxGroovy, also includes the
decode operator which converts a stream of multibyte characters into an Observable that
emits byte arrays that respect the character boundaries.
In RxJava, the from operator can convert a Future, an Iterable, or an Array.
In the case of an Iterable or an Array, the resulting Observable will emit each item contained
in the Iterable or Array.
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);
myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);
0 1 2 3 4 5 Sequence complete
In the case of a Future, it will emit the single result of the get call. You may
optionally pass the version of from that accepts a future two additional
parameters indicating a timeout span and the units of time that span is denominated in. The
resulting Observable will terminate with an error if that span of time passes before the
Future responds with a value.
from does not by default operate on any particular Scheduler,
however you can pass the variant that converts a Future a Scheduler as an optional second parameter, and
it will use that Scheduler to govern the Future.
from(array)from(Iterable)from(Future)from(Future,Scheduler)from(Future,timout,timeUnit)
In addition, in the RxJavaAsyncUtil package, you have available to you the
following operators that convert actions, callables, functions, and runnables into
Observables that emit the results of those things:
fromActionfromCallablefromFunc0fromRunnableSee the Start operator for more information about those operators.
Note that there is also a from operator that is a method of the optional
StringObservable class. It converts a stream of characters or a
Reader into an Observable that emits byte arrays or Strings.
In the separate RxJavaAsyncUtil package, which is not included by default with RxJava, there
is also a runAsync function. Pass runAsync an Action and a
Scheduler, and it will return a
StoppableObservable that uses the specified Action to generate items that it
emits.
The Action accepts an Observer and a Subscription. It uses the
Subscription to check for the isUnsubscribed condition, upon which it will stop
emitting items. You can also manually stop a StoppableObservable at any time by calling its
unsubscribe method (which will also unsubscribe the Subscription you have
associated with the StoppableObservable).
Because runAsync immediately invokes the Action and begins emitting the items, it
is possible that some items may be lost in the interval between when you establish the
StoppableObservable with this method and when your Observer is ready to receive
items. If this is a problem, you can use the variant of runAsync that also accepts a
Subject and pass a ReplaySubject with which you can
retrieve the otherwise-missing items.
The StringObservable class, which is not a default part of RxGroovy, also includes the
decode operator which converts a stream of multibyte characters into an Observable that
emits byte arrays that respect the character boundaries.
There are several, specialized From variants in RxJS:
In RxJS, the from operator converts an array-like or iterable object into an
Observable that emits the items in that array or iterable. A String, in this context, is
treated as an array of characters.
This operator also takes three additional, optional parameters:
// Array-like object (arguments) to Observable
function f() {
return Rx.Observable.from(arguments);
}
f(1, 2, 3).subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: 1 Next: 2 Next: 3 Completed
// Any iterable object...
// Set
var s = new Set(['foo', window]);
Rx.Observable.from(s).subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: foo Next: window Completed
// Map
var m = new Map([[1, 2], [2, 4], [4, 8]]);
Rx.Observable.from(m).subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: [1, 2] Next: [2, 4] Next: [4, 8] Completed
// String
Rx.Observable.from("foo").subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: f Next: o Next: o Completed
// Using an arrow function as the map function to manipulate the elements
Rx.Observable.from([1, 2, 3], function (x) { return x + x; }).subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: 2 Next: 4 Next: 6 Completed
// Generate a sequence of numbers
Rx.Observable.from({length: 5}, function(v, k) { return k; }).subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
from is found in the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
The fromCallback operator takes a function as a parameter, calls this
function, and emits the value returned from it as its single emission.
This operator also takes two additional, optional parameters:
var fs = require('fs'),
Rx = require('rx');
// Wrap fs.exists
var exists = Rx.Observable.fromCallback(fs.exists);
// Check if file.txt exists
var source = exists('file.txt');
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: true Completed
fromCallback is found in the following distributions:
rx.all.jsrx.all.compat.jsrx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.lite.jsrx.lite.compat.js
There is also a fromNodeCallback operator, which is specialized for the types
of callback functions found in Node.js.
This operator takes three additional, optional parameters:
var fs = require('fs'),
Rx = require('rx');
// Wrap fs.exists
var rename = Rx.Observable.fromNodeCallback(fs.rename);
// Rename file which returns no parameters except an error
var source = rename('file1.txt', 'file2.txt');
var subscription = source.subscribe(
function () { console.log('Next: success!'); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: success! Completed
fromNodeCallback is found in the following distributions:
rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.lite.jsrx.lite.compat.js
The fromEvent operator takes an “element” and an event name as
parameters, and it then listens for events of that name taking place on that element. It
returns an Observable that emits those events. An “element” may be a simple
DOM element, or a NodeList, jQuery element,
Zepto Element, Angular element, Ember.js element, or EventEmitter.
This operator also takes an optional third parameter: a function that accepts the arguments from the event handler as parameters and returns an item to be emitted by the resulting Observable in place of the event.
// using a jQuery element
var input = $('#input');
var source = Rx.Observable.fromEvent(input, 'click');
var subscription = source.subscribe(
function (x) { console.log('Next: Clicked!'); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
input.trigger('click');Next: Clicked!
// using a Node.js EventEmitter and the optional third parameter
var EventEmitter = require('events').EventEmitter,
Rx = require('rx');
var eventEmitter = new EventEmitter();
var source = Rx.Observable.fromEvent(
eventEmitter,
'data',
function (first, second) {
return { foo: first, bar: second };
});
var subscription = source.subscribe(
function (x) {
console.log('Next: foo -' + x.foo + ', bar -' + x.bar);
},
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
eventEmitter.emit('data', 'baz', 'quux');Next: foo - baz, bar - quux
fromEvent is found in the following distributions:
rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.lite.jsrx.lite.compat.js
The fromEventPattern operator is similar, except that instead of taking an
element and an event name as parameters, it takes two functions as parameters. The first
function attaches an event listener to a variety of events on a variety of elements; the
second function removes this set of listeners. In this way you can establish a single
Observable that emits items representing a variety of events and a variety of target elements.
var input = $('#input');
var source = Rx.Observable.fromEventPattern(
function add (h) {
input.bind('click', h);
},
function remove (h) {
input.unbind('click', h);
}
);
var subscription = source.subscribe(
function (x) { console.log('Next: Clicked!'); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });
input.trigger('click');Next: Clicked!
The of operator accepts a number of items as parameters, and returns an
Observable that emits each of these parameters, in order, as its emitted sequence.
var source = Rx.Observable.of(1,2,3);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });Next: 1 Next: 2 Next: 3 Completed
of is found in the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
A variant of this operator, called ofWithScheduler takes a
Scheduler as its first parameter, and operates the resulting Observable
on this Scheduler.
There is also a fromPromise operator that converts a Promise into an Observable,
converting its resolve calls into onNext notifications, and its
reject calls into onError notifications.
fromPromise is found in the following distributions:
rx.async.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.async.compat.js (requires rx.binding.js and either rx.js or rx.compat.js)rx.lite.jsrx.lite.compat.js
var promise = new RSVP.Promise(function (resolve, reject) {
resolve(42);
});
var source = Rx.Observable.fromPromise(promise);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (e) { console.log('Error: ' + e); },
function ( ) { console.log('Completed'); });Next: 42: Completed
var promise = new RSVP.Promise(function (resolve, reject) {
reject(new Error('reason'));
});
var source = Rx.Observable.fromPromise(promise);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (e) { console.log('Error: ' + e); },
function ( ) { console.log('Completed'); });Error: Error: reject
There is also an ofArrayChanges operator that monitors an Array with the
Array.observe method, and returns an Observable that emits any changes that take place in
the array. This operator is found only in the rx.all.js distribution.
var arr = [1,2,3];
var source = Rx.Observable.ofArrayChanges(arr);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (e) { console.log('Error: ' + e); },
function ( ) { console.log('Completed'); });
arr.push(4)
Next: {type: "splice", object: Array[4], index: 3, removed: Array[0], addedCount: 1}
A similar operator is ofObjectChanges. It returns an Observable that emits any changes made
to a particular object, as reported by its Object.observe method. It is also found only in
the rx.all.js distribution.
var obj = {x: 1};
var source = Rx.Observable.ofObjectChanges(obj);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (e) { console.log('Error: ' + e); },
function ( ) { console.log('Completed'); });
obj.x = 42;
Next: {type: "update", object: Object, name: "x", oldValue: 1}
There is also a pairs operator. This operator accepts an Object, and returns an Observable
that emits, as key/value pairs, the attributes of that object.
var obj = {
foo: 42,
bar: 56,
baz: 78
};
var source = Rx.Observable.pairs(obj);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (e) { console.log('Error: ' + e); },
function ( ) { console.log('Completed'); });Next: ['foo', 42] Next: ['bar', 56] Next: ['baz', 78] Completed
pairs is found in the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
RxPHP implements this operator as fromArray.
Converts an array to an observable sequence
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/fromArray/fromArray.php $source = \Rx\Observable::fromArray([1, 2, 3, 4]); $subscription = $source->subscribe($stdoutObserver); //Next value: 1 //Next value: 2 //Next value: 3 //Next value: 4 //Complete!
Next value: 1
Next value: 2
Next value: 3
Next value: 4
Complete!
RxPHP also has an operator fromIterator.
Converts an Iterator into an observable sequence
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/iterator/iterator.php
$generator = function () {
for ($i = 1; $i <= 3; $i++) {
yield $i;
}
return 4;
};
$source = Rx\Observable::fromIterator($generator());
$source->subscribe($stdoutObserver);
Next value: 1
Next value: 2
Next value: 3
Next value: 4
Complete!
RxPHP also has an operator asObservable.
Hides the identity of an observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/asObservable/asObservable.php // Create subject $subject = new \Rx\Subject\AsyncSubject(); // Send a value $subject->onNext(42); $subject->onCompleted(); // Hide its type $source = $subject->asObservable(); $source->subscribe($stdoutObserver);
Next value: 42
Complete!
RxPHP also has an operator fromPromise.
Converts a promise into an observable
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/promise/fromPromise.php $promise = \React\Promise\resolve(42); $source = \Rx\Observable::fromPromise($promise); $subscription = $source->subscribe($stdoutObserver);
Next value: 42
Complete!
In Swift, this is implemented using the Observable.from class method.
Each element of the array is produced as an emission. The difference between this method and Observable.just is that the latter emits the whole array as one emission.
let numbers = [1,2,3,4,5]
let source = Observable.from(numbers)
source.subscribe {
print($0)
}
next(1) next(2) next(3) next(4) next(5) completed