The GroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.
TBD
RxGroovy implements the groupBy operator. The Observable it returns emits items
of a particular subclass of Observable — the GroupedObservable. Objects that
implement the GroupedObservable interface have an additional method —
getkey — by which you can retrieve the key by which items were designated
for this particular GroupedObservable.
The following sample code uses groupBy to transform a list of numbers into two
lists, grouped by whether or not the numbers are even:
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };
numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);[false, 1, 3, 5, 7, 9] [true, 2, 4, 6, 8] Sequence complete
Another version of groupBy allows you to pass in a transformative function
that changes the elements before they are emitted by the resulting
GroupedObservables.
Note that when groupBy splits up the source Observable into an Observable that
emits GroupedObservables, each of these GroupedObservables begins to
buffer the items that it will emit upon subscription. For this reason, if you ignore any of
these GroupedObservables (you neither subscribe to it or apply an operator to
it that subscribes to it), this buffer will present a potential memory leak. For this reason,
rather than ignoring a GroupedObservable that you have no interest in observing,
you should instead apply an operator like take(0) to it
as a way of signalling to it that it may discard its buffer.
If you unsubscribe from one of the GroupedObservables, or if an operator like
take that you apply to the GroupedObservable unsubscribes from it,
that GroupedObservable will be terminated. If the source Observable later emits an
item whose key matches the GroupedObservable that was terminated in this way,
groupBy will create and emit a new GroupedObservable to match
the key. In other words, unsubscribing from a GroupedObservable will not
cause groupBy to swallow items from its group. For example, see the following code:
Observable.range(1,5)
.groupBy({ 0 })
.flatMap({ this.take(1) })
.subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);1 2 3 4 5
In the above code, the source Observable emits the sequence { 1 2 3 4 5 }. When it emits
the first item in this sequence, the groupBy operator creates and emits a
GroupedObservable with the key of 0. The flatMap operator
applies the take(1) operator to that GroupedObservable, which gives it the
item (1) that it emits and that also unsubscribes from the GroupedObservable,
which is terminated. When the source Observable emits the second item in its sequence, the
groupBy operator creates and emits a second GroupedObservable with
the same key (0) to replace the one that was terminated. flatMap again applies
take(1) to this new GroupedObservable to retrieve the new item to emit
(2) and to unsubscribe from and terminate the GroupedObservable, and this
process repeats for the remaining items in the source sequence.
groupBy does not by default operate on any particular
Scheduler.
groupBy(Func1)groupBy(Func1,Func1)
RxJava implements the groupBy operator. The Observable it returns emits items
of a particular subclass of Observable — the GroupedObservable. Objects
that implement the GroupedObservable interface have an additional method —
getkey — by which you can retrieve the key by which items were designated
for this particular GroupedObservable.
Another version of groupBy allows you to pass in a transformative function
that changes the elements before they are emitted by the resulting
GroupedObservables.
Note that when groupBy splits up the source Observable into an Observable that
emits GroupedObservables, each of these GroupedObservables begins to
buffer the items that it will emit upon subscription. For this reason, if you ignore any of
these GroupedObservables (you neither subscribe to it or apply an operator to
it that subscribes to it), this buffer will present a potential memory leak. For this reason,
rather than ignoring a GroupedObservable that you have no interest in observing,
you should instead apply an operator like take(0) to it
as a way of signalling to it that it may discard its buffer.
If you unsubscribe from one of the GroupedObservables, that
GroupedObservable will be terminated. If the source Observable later emits an
item whose key matches the GroupedObservable that was terminated in this way,
groupBy will create and emit a new GroupedObservable to match the
key.
groupBy does not by default operate on any particular
Scheduler.
groupBy(Func1)groupBy(Func1,Func1)
RxJS implements groupBy. It takes one to three parameters:
var codes = [
{ keyCode: 38}, // up
{ keyCode: 38}, // up
{ keyCode: 40}, // down
{ keyCode: 40}, // down
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 66}, // b
{ keyCode: 65} // a
];
var source = Rx.Observable.fromArray(codes)
.groupBy(
function (x) { return x.keyCode; },
function (x) { return x.keyCode; });
var subscription = source.subscribe(
function (obs) {
// Print the count
obs.count().subscribe(function (x) {
console.log('Count: ' + x);
});
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});Count: 2 Count: 2 Count: 2 Count: 2 Count: 1 Count: 1 Completed
groupBy is found in each of the following distributions:
rx.all.jsrx.all.compat.jsrx.coincidence.js
RxJS also implements groupByUntil. It monitors an additional Observable, and
whenever that Observable emits an item, it closes any of the keyed Observables it has opened
(it will open new ones if additional items are emitted by the source Observable that match
the key). groupByUntil takes from two to four parameters:
var codes = [
{ keyCode: 38}, // up
{ keyCode: 38}, // up
{ keyCode: 40}, // down
{ keyCode: 40}, // down
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 66}, // b
{ keyCode: 65} // a
];
var source = Rx.Observable
.for(codes, function (x) { return Rx.Observable.return(x).delay(1000); })
.groupByUntil(
function (x) { return x.keyCode; },
function (x) { return x.keyCode; },
function (x) { return Rx.Observable.timer(2000); });
var subscription = source.subscribe(
function (obs) {
// Print the count
obs.count().subscribe(function (x) { console.log('Count: ' + x); });
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});Count: 2 Count: 2 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Count: 1 Completed
groupByUntil is found in each of the following distributions:
rx.all.jsrx.all.compat.jsrx.coincidence.jsTBD
RxPHP implements this operator as groupBy.
Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php
$observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]);
$observable
->groupBy(
function ($elem) {
if ($elem === 42) {
return 0;
}
return 1;
},
null,
function ($key) {
return $key;
}
)
->subscribeCallback(function ($groupedObserver) use ($createStdoutObserver) {
$groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": "));
});
1: Next value: 21
0: Next value: 42
1: Next value: 21
0: Next value: 42
1: Next value: 21
0: Next value: 42
1: Complete!
0: Complete!
RxPHP also has an operator groupByUntil.
Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php
$loop = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);
$codes = [
['id' => 38],
['id' => 38],
['id' => 40],
['id' => 40],
['id' => 37],
['id' => 39],
['id' => 37],
['id' => 39],
['id' => 66],
['id' => 65]
];
$source = Rx\Observable
::fromArray($codes)
->concatMap(function ($x) {
return \Rx\Observable::timer(100)->mapTo($x);
})
->groupByUntil(
function ($x) {
return $x['id'];
},
function ($x) {
return $x['id'];
},
function ($x) {
return Rx\Observable::timer(200);
});
$subscription = $source->subscribe(new CallbackObserver(
function (\Rx\Observable $obs) {
// Print the count
$obs->count()->subscribe(new CallbackObserver(
function ($x) {
echo 'Count: ', $x, PHP_EOL;
}));
},
function (Exception $err) {
echo 'Error', $err->getMessage(), PHP_EOL;
},
function () {
echo 'Completed', PHP_EOL;
}), $scheduler);
$loop->run();
Count: 2
Count: 2
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Completed
RxPHP also has an operator partition.
Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.php
list($evens, $odds) = \Rx\Observable::range(0, 10)
->partition(function ($x) {
return $x % 2 === 0;
});
$evens->subscribe($createStdoutObserver('Evens '));
$odds->subscribe($createStdoutObserver('Odds '));
Evens Next value: 0
Evens Next value: 2
Evens Next value: 4
Evens Next value: 6
Evens Next value: 8
Evens Complete!
Odds Next value: 1
Odds Next value: 3
Odds Next value: 5
Odds Next value: 7
Odds Next value: 9
Odds Complete!
TBD
TBD