RxGroovy implements the groupBy
operator. The Observable it returns emits items
of a particular subclass of Observable — the GroupedObservable
. Objects that
implement the GroupedObservable
interface have an additional method —
getkey
— by which you can retrieve the key by which items were designated
for this particular GroupedObservable
.
The following sample code uses groupBy
to transform a list of numbers into two
lists, grouped by whether or not the numbers are even:
Sample Code
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };
numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete
Another version of groupBy
allows you to pass in a transformative function
that changes the elements before they are emitted by the resulting
GroupedObservable
s.
Note that when groupBy
splits up the source Observable into an Observable that
emits GroupedObservable
s, each of these GroupedObservable
s begins to
buffer the items that it will emit upon subscription. For this reason, if you ignore any of
these GroupedObservable
s (you neither subscribe to it or apply an operator to
it that subscribes to it), this buffer will present a potential memory leak. For this reason,
rather than ignoring a GroupedObservable
that you have no interest in observing,
you should instead apply an operator like take(0)
to it
as a way of signalling to it that it may discard its buffer.
If you unsubscribe from one of the GroupedObservable
s, or if an operator like
take
that you apply to the GroupedObservable
unsubscribes from it,
that GroupedObservable
will be terminated. If the source Observable later emits an
item whose key matches the GroupedObservable
that was terminated in this way,
groupBy
will create and emit a new GroupedObservable
to match
the key. In other words, unsubscribing from a GroupedObservable
will not
cause groupBy
to swallow items from its group. For example, see the following code:
Sample Code
Observable.range(1,5)
.groupBy({ 0 })
.flatMap({ this.take(1) })
.subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
In the above code, the source Observable emits the sequence { 1 2 3 4 5 }
. When it emits
the first item in this sequence, the groupBy
operator creates and emits a
GroupedObservable
with the key of 0
. The flatMap
operator
applies the take(1)
operator to that GroupedObservable
, which gives it the
item (1
) that it emits and that also unsubscribes from the GroupedObservable
,
which is terminated. When the source Observable emits the second item in its sequence, the
groupBy
operator creates and emits a second GroupedObservable
with
the same key (0
) to replace the one that was terminated. flatMap
again applies
take(1)
to this new GroupedObservable
to retrieve the new item to emit
(2
) and to unsubscribe from and terminate the GroupedObservable
, and this
process repeats for the remaining items in the source sequence.
groupBy
does not by default operate on any particular
Scheduler.