To

convert an Observable into another object or data structure

To

The various language-specific implementations of ReactiveX have a variety of operators that you can use to convert an Observable, or a sequence of items emitted by an Observable, into another variety of object or data structure. Some of these block until the Observable terminates and then produce an equivalent object or data structure; others return an Observable that emits such an object or data structure.

In some implementations of ReactiveX, there is also an operator that converts an Observable into a “Blocking” Observable. A Blocking Observable extends the ordinary Observable by providing a set of methods, operating on the items emitted by the Observable, that block. Some of the To operators are in this Blocking Obsevable set of extended operations.

See Also

Language-Specific Information:

getIterator

The getIterator operator applies to the BlockingObservable subclass, so in order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

This operator converts an Observable into an Iterator with which you can iterate over the set of items emitted by the source Observable.

toFuture

The toFuture operator applies to the BlockingObservable subclass, so in order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

This operator converts an Observable into an Future that will return the single item emitted by the source Observable. If the source Observable emits more than one item, the Future will receive an IllegalArgumentException; if it completes after emitting no items, the Future will receive a NoSuchElementException.

If you want to convert an Observable that may emit multiple items into a Future, try something like this: myObservable.toList().toBlocking().toFuture().

toIterable

The toIterable operator applies to the BlockingObservable subclass, so in order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

This operator converts an Observable into an Iterable with which you can iterate over the set of items emitted by the source Observable.

toList

Normally, an Observable that emits multiple items will do so by invoking its observer’s onNext method for each such item. You can change this behavior, instructing the Observable to compose a list of these multiple items and then to invoke the observer’s onNext method only once, passing it the entire list, by applying the toList operator to the Observable.

For example, the following rather pointless code takes a list of integers, converts it into an Observable, then converts that Observable into one that emits the original list as a single item:

Sample Code

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

numbers.toList().subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[1, 2, 3, 4, 5, 6, 7, 8, 9]
Sequence complete

If the source Observable invokes onCompleted before emitting any items, the Observable returned by toList will emit an empty list before invoking onCompleted. If the source Observable invokes onError, the Observable returned by toList will immediately invoke the onError methods of its observers.

toList does not by default operate on any particular Scheduler.

toMap

The toMap operator collects the items emitted by the source Observable into a map (by default, a HashMap, but you can optionally supply a factory function that generates another Map variety) and then emits that map. You supply a function that generates the key for each emitted item. You may also optionally supply a function that converts an emitted item into the value to be stored in the map (by default, the item itself is this value).

toMap does not by default operate on any particular Scheduler.

toMultiMap

The toMultiMap operator is similar to toMap except that the map it generates is also an ArrayList (by default; or you can pass an optional factory method as a fourth parameter by which you generate the variety of collection you prefer).

toMultiMap does not by default operate on any particular Scheduler.

toSortedList

The toSortedList operator behaves much like toList except that it sorts the resulting list. By default it sorts the list naturally in ascending order by means of the Comparable interface. If any of the items emitted by the Observable does not support Comparable with respect to the type of every other item emitted by the Observable, toSortedList will throw an exception. However, you can change this default behavior by also passing in to toSortedList a function that takes as its parameters two items and returns a number; toSortedList will then use that function instead of Comparable to sort the items.

For example, the following code takes a list of unsorted integers, converts it into an Observable, then converts that Observable into one that emits the original list in sorted form as a single item:

Sample Code

numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]);

numbers.toSortedList().subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[1, 2, 3, 4, 5, 6, 7, 8, 9]
Sequence complete

Here is an example that provides its own sorting function: in this case, one that sorts numbers according to how close they are to the number 5.

numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]);

numbers.toSortedList({ n, m -> Math.abs(5-n) - Math.abs(5-m) }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[5, 6, 4, 3, 7, 8, 2, 1, 9]
Sequence complete

toSortedList does not by default operate on any particular Scheduler.

nest

RxGroovy also has a nest operator that has one particular purpose: it converts a source Observable into an Observable that emits that source Observable as its sole item.

getIterator

The getIterator operator applies to the BlockingObservable subclass, so in order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

This operator converts an Observable into an Iterator with which you can iterate over the set of items emitted by the source Observable.

toFuture

The toFuture operator applies to the BlockingObservable subclass, so in order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

This operator converts an Observable into an Future that will return the single item emitted by the source Observable. If the source Observable emits more than one item, the Future will receive an IllegalArgumentException; if it completes after emitting no items, the Future will receive a NoSuchElementException.

If you want to convert an Observable that may emit multiple items into a Future, try something like this: myObservable.toList().toBlocking().toFuture().

toIterable

The toIterable operator applies to the BlockingObservable subclass, so in order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

This operator converts an Observable into an Iterable with which you can iterate over the set of items emitted by the source Observable.

toList

Normally, an Observable that emits multiple items will do so by invoking its observer’s onNext method for each such item. You can change this behavior, instructing the Observable to compose a list of these multiple items and then to invoke the observer’s onNext method only once, passing it the entire list, by applying the toList operator to the Observable.

If the source Observable invokes onCompleted before emitting any items, the Observable returned by toList will emit an empty list before invoking onCompleted. If the source Observable invokes onError, the Observable returned by toList will immediately invoke the onError methods of its observers.

toList does not by default operate on any particular Scheduler.

toMap

The toMap operator collects the items emitted by the source Observable into a map (by default, a HashMap, but you can optionally supply a factory function that generates another Map variety) and then emits that map. You supply a function that generates the key for each emitted item. You may also optionally supply a function that converts an emitted item into the value to be stored in the map (by default, the item itself is this value).

toMap does not by default operate on any particular Scheduler.

toMultiMap

The toMultiMap operator is similar to toMap except that the map it generates is also an ArrayList (by default; or you can pass an optional factory method as a fourth parameter by which you generate the variety of collection you prefer).

toMultiMap does not by default operate on any particular Scheduler.

toSortedList

The toSortedList operator behaves much like toList except that it sorts the resulting list. By default it sorts the list naturally in ascending order by means of the Comparable interface. If any of the items emitted by the Observable does not support Comparable with respect to the type of every other item emitted by the Observable, toSortedList will throw an exception. However, you can change this default behavior by also passing in to toSortedList a function that takes as its parameters two items and returns a number; toSortedList will then use that function instead of Comparable to sort the items.

toSortedList does not by default operate on any particular Scheduler.

nest

RxJava also has a nest operator that has one particular purpose: it converts a source Observable into an Observable that emits that source Observable as its sole item.

toArray

Normally, an Observable that emits multiple items will do so by invoking its observer’s onNext method for each such item. You can change this behavior, instructing the Observable to compose an array of these multiple items and then to invoke the observer’s onNext method only once, passing it the entire array, by applying the toArray operator to the Observable.

Sample Code

var source = Rx.Observable.timer(0, 1000)
    .take(5)
    .toArray();

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: [0,1,2,3,4]
Completed

toArray is found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js
toMap

The toMap operator collects the items emitted by the source Observable into a Map and then emits that map. You supply a function that generates the key for each emitted item. You may also optionally supply a function that converts an emitted item into the value to be stored in the map (by default, the item itself is this value).

Sample Code

var source = Rx.Observable.timer(0, 1000)
    .take(5)
    .toMap(function (x) { return x * 2; }, function (x) { return x * 4; });

var subscription = source.subscribe(
    function (x) {
        var arr = [];
        x.forEach(function (value, key) { arr.push(value, key); })
        console.log('Next: ' + arr);
    },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: [0,0,2,4,4,8,6,12,8,16]
Completed

toMap is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.aggregates.js
toSet

Normally, an Observable that emits multiple items will do so by invoking its observer’s onNext method for each such item. You can change this behavior, instructing the Observable to compose a Set of these multiple items and then to invoke the observer’s onNext method only once, passing it the entire Set, by applying the toSet operator to the Observable.

Note that this only works in an ES6 environment or polyfilled.

var source = Rx.Observable.timer(0, 1000)
    .take(5)
    .toSet();

var subscription = source.subscribe(
    function (x) {
        var arr = [];
        x.forEach(function (i) { arr.push(i); })
        console.log('Next: ' + arr);
    },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: [0,1,2,3,4]
Completed

toSet is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.aggregates.js

RxPHP implements this operator as toArray.

Creates an observable sequence containing a single element which is an array containing all the elements of the source sequence.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/toArray/toArray.php

$source = \Rx\Observable::fromArray([1, 2, 3, 4]);

$observer = $createStdoutObserver();

$subscription = $source->toArray()
    ->subscribe(new CallbackObserver(
        function ($array) use ($observer) {
            $observer->onNext(json_encode($array));
        },
        [$observer, "onError"],
        [$observer, "onCompleted"]
    ));

   
Next value: [1,2,3,4]
Complete!