You can emit only the first n items emitted by an Observable and then complete while ignoring the remainder, by modifying the Observable with the Take operator.
take
TBD
TBD
In RxGroovy, this operator is implemented as take
.
If you use the take(n)
operator (or its synonym, limit(n)
) on an
Observable, and that Observable emits fewer than n items before completing, the new,
take
-modified Observable will not throw an exception or invoke onError
, but
will merely emit this same fewer number of items before it completes.
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]); numbers.take(3).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
1 2 3 Sequence complete
This variant of take
does not by default operate on any particular
Scheduler.
take(int)
There is also a variant of take
that takes a temporal duration rather than a quantity of
items. It results in an Observable that emits only those items that are emitted during that initial
duration of the source Observable’s lifespan. You set this duration by passing in a length of time
and the time units this length is denominated in as parameters to take
.
This variant of take
by default operates on the computation
Scheduler, but you may also pass in a Scheduler of your choosing as an
optional third parameter.
take(long,TimeUnit)
take(long,TimeUnit,Scheduler)
In RxJava, this operator is implemented as take
.
If you use the take(n)
operator (or its synonym, limit(n)
) on an
Observable, and that Observable emits fewer than n items before completing, the new,
take
-modified Observable will not throw an exception or invoke onError
, but
will merely emit this same fewer number of items before it completes.
Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .take(4) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
This variant of take
does not by default operate on any particular
Scheduler.
take(int)
There is also a variant of take
that takes a temporal duration rather than a quantity of
items. It results in an Observable that emits only those items that are emitted during that initial
duration of the source Observable’s lifespan. You set this duration by passing in a length of time
and the time units this length is denominated in as parameters to take
.
This variant of take
by default operates on the computation
Scheduler, but you may also pass in a Scheduler of your choosing as an
optional third parameter.
take(long,TimeUnit)
take(long,TimeUnit,Scheduler)
TBD
RxJS implements the take
operator.
var source = Rx.Observable.range(0, 5) .take(3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Completed
For the special case of take(0)
you can also pass as a second parameter a
Scheduler that take
will use to immediately schedule a call
to onCompleted
.
take
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxJS also implements a takeUntilWithTime
operator, which is like take
except that rather than taking a particular quantity of items, it takes all of the items that are
emitted during an initial period of time. You establish this period of by passing in a parameter to
takeUntilWithTime
, in either of these formats:
Date
You may also, optionally, pass in a Scheduler as a second parameter, and
the timer will operate on that Scheduler (takeUntilWithTime
uses the timeout
Scheduler by default).
var source = Rx.Observable.timer(0, 1000) .takeUntilWithTime(5000); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
takeUntilWithTime
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.time.js
(requires rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
TBD
TBD
RxPHP implements this operator as take
.
Returns a specified number of contiguous elements from the start of an observable sequence
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/take.php $observable = Rx\Observable::fromArray([21, 42, 63]); $observable ->take(2) ->subscribe($stdoutObserver);
Next value: 21 Next value: 42 Complete!
RxPHP also has an operator takeUntil
.
Returns the values from the source observable sequence until the other observable sequence produces a value.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/takeUntil.php $source = \Rx\Observable::interval(105) ->takeUntil(\Rx\Observable::timer(1000)); $subscription = $source->subscribe($stdoutObserver);
Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Complete!
TBD
TBD
TBD