The GroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.
TBD
RxGroovy implements the groupBy
operator. The Observable it returns emits items
of a particular subclass of Observable — the GroupedObservable
. Objects that
implement the GroupedObservable
interface have an additional method —
getkey
— by which you can retrieve the key by which items were designated
for this particular GroupedObservable
.
The following sample code uses groupBy
to transform a list of numbers into two
lists, grouped by whether or not the numbers are even:
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]); def groupFunc = { return(0 == (it % 2)); }; numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
[false, 1, 3, 5, 7, 9] [true, 2, 4, 6, 8] Sequence complete
Another version of groupBy
allows you to pass in a transformative function
that changes the elements before they are emitted by the resulting
GroupedObservable
s.
Note that when groupBy
splits up the source Observable into an Observable that
emits GroupedObservable
s, each of these GroupedObservable
s begins to
buffer the items that it will emit upon subscription. For this reason, if you ignore any of
these GroupedObservable
s (you neither subscribe to it or apply an operator to
it that subscribes to it), this buffer will present a potential memory leak. For this reason,
rather than ignoring a GroupedObservable
that you have no interest in observing,
you should instead apply an operator like take(0)
to it
as a way of signalling to it that it may discard its buffer.
If you unsubscribe from one of the GroupedObservable
s, or if an operator like
take
that you apply to the GroupedObservable
unsubscribes from it,
that GroupedObservable
will be terminated. If the source Observable later emits an
item whose key matches the GroupedObservable
that was terminated in this way,
groupBy
will create and emit a new GroupedObservable
to match
the key. In other words, unsubscribing from a GroupedObservable
will not
cause groupBy
to swallow items from its group. For example, see the following code:
Observable.range(1,5) .groupBy({ 0 }) .flatMap({ this.take(1) }) .subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 4 5
In the above code, the source Observable emits the sequence { 1 2 3 4 5 }
. When it emits
the first item in this sequence, the groupBy
operator creates and emits a
GroupedObservable
with the key of 0
. The flatMap
operator
applies the take(1)
operator to that GroupedObservable
, which gives it the
item (1
) that it emits and that also unsubscribes from the GroupedObservable
,
which is terminated. When the source Observable emits the second item in its sequence, the
groupBy
operator creates and emits a second GroupedObservable
with
the same key (0
) to replace the one that was terminated. flatMap
again applies
take(1)
to this new GroupedObservable
to retrieve the new item to emit
(2
) and to unsubscribe from and terminate the GroupedObservable
, and this
process repeats for the remaining items in the source sequence.
groupBy
does not by default operate on any particular
Scheduler.
groupBy(Func1)
groupBy(Func1,Func1)
RxJava implements the groupBy
operator. The Observable it returns emits items
of a particular subclass of Observable — the GroupedObservable
. Objects
that implement the GroupedObservable
interface have an additional method —
getkey
— by which you can retrieve the key by which items were designated
for this particular GroupedObservable
.
Another version of groupBy
allows you to pass in a transformative function
that changes the elements before they are emitted by the resulting
GroupedObservable
s.
Note that when groupBy
splits up the source Observable into an Observable that
emits GroupedObservable
s, each of these GroupedObservable
s begins to
buffer the items that it will emit upon subscription. For this reason, if you ignore any of
these GroupedObservable
s (you neither subscribe to it or apply an operator to
it that subscribes to it), this buffer will present a potential memory leak. For this reason,
rather than ignoring a GroupedObservable
that you have no interest in observing,
you should instead apply an operator like take(0)
to it
as a way of signalling to it that it may discard its buffer.
If you unsubscribe from one of the GroupedObservable
s, that
GroupedObservable
will be terminated. If the source Observable later emits an
item whose key matches the GroupedObservable
that was terminated in this way,
groupBy
will create and emit a new GroupedObservable
to match the
key.
groupBy
does not by default operate on any particular
Scheduler.
groupBy(Func1)
groupBy(Func1,Func1)
RxJS implements groupBy
. It takes one to three parameters:
var codes = [ { keyCode: 38}, // up { keyCode: 38}, // up { keyCode: 40}, // down { keyCode: 40}, // down { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 66}, // b { keyCode: 65} // a ]; var source = Rx.Observable.fromArray(codes) .groupBy( function (x) { return x.keyCode; }, function (x) { return x.keyCode; }); var subscription = source.subscribe( function (obs) { // Print the count obs.count().subscribe(function (x) { console.log('Count: ' + x); }); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Count: 2 Count: 2 Count: 2 Count: 2 Count: 1 Count: 1 Completed
groupBy
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.coincidence.js
RxJS also implements groupByUntil
. It monitors an additional Observable, and
whenever that Observable emits an item, it closes any of the keyed Observables it has opened
(it will open new ones if additional items are emitted by the source Observable that match
the key). groupByUntil
takes from two to four parameters:
var codes = [ { keyCode: 38}, // up { keyCode: 38}, // up { keyCode: 40}, // down { keyCode: 40}, // down { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 37}, // left { keyCode: 39}, // right { keyCode: 66}, // b { keyCode: 65} // a ]; var source = Rx.Observable .for(codes, function (x) { return Rx.Observable.return(x).delay(1000); }) .groupByUntil( function (x) { return x.keyCode; }, function (x) { return x.keyCode; }, function (x) { return Rx.Observable.timer(2000); }); var subscription = source.subscribe( function (obs) { // Print the count obs.count().subscribe(function (x) { console.log('Count: ' + x); }); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Count: 2 Count: 2 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Completed
groupByUntil
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.coincidence.js
TBD
RxPHP implements this operator as groupBy
.
Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php $observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]); $observable ->groupBy( function ($elem) { if ($elem === 42) { return 0; } return 1; }, null, function ($key) { return $key; } ) ->subscribe(function ($groupedObserver) use ($createStdoutObserver) { $groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": ")); });
1: Next value: 21 0: Next value: 42 1: Next value: 21 0: Next value: 42 1: Next value: 21 0: Next value: 42 1: Complete! 0: Complete!
RxPHP also has an operator groupByUntil
.
Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php $codes = [ ['id' => 38], ['id' => 38], ['id' => 40], ['id' => 40], ['id' => 37], ['id' => 39], ['id' => 37], ['id' => 39], ['id' => 66], ['id' => 65] ]; $source = Rx\Observable ::fromArray($codes) ->concatMap(function ($x) { return \Rx\Observable::timer(100)->mapTo($x); }) ->groupByUntil( function ($x) { return $x['id']; }, function ($x) { return $x['id']; }, function ($x) { return Rx\Observable::timer(200); }); $subscription = $source->subscribe(new CallbackObserver( function (\Rx\Observable $obs) { // Print the count $obs->count()->subscribe(new CallbackObserver( function ($x) { echo 'Count: ', $x, PHP_EOL; })); }, function (Throwable $err) { echo 'Error', $err->getMessage(), PHP_EOL; }, function () { echo 'Completed', PHP_EOL; }));
Count: 2 Count: 2 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Completed
RxPHP also has an operator partition
.
Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.php list($evens, $odds) = \Rx\Observable::range(0, 10, \Rx\Scheduler::getImmediate()) ->partition(function ($x) { return $x % 2 === 0; }); //Because we used the immediate scheduler with range, the subscriptions are not asynchronous. $evens->subscribe($createStdoutObserver('Evens ')); $odds->subscribe($createStdoutObserver('Odds '));
Evens Next value: 0 Evens Next value: 2 Evens Next value: 4 Evens Next value: 6 Evens Next value: 8 Evens Complete! Odds Next value: 1 Odds Next value: 3 Odds Next value: 5 Odds Next value: 7 Odds Next value: 9 Odds Complete!
TBD
TBD