Sample

emit the most recent items emitted by an Observable within periodic time intervals

The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling.

In some implementations, there is also a ThrottleFirst operator that is similar, but emits not the most-recently emitted item in the sample period, but the first item that was emitted during that period.

See Also

Language-Specific Information:

RxGroovy implements this operator as sample and throttleLast.

Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period.

sample

One variant of sample (or its alias, throttleLast) samples at a periodic time interval that you choose by passing in a TimeUnit and a quantity of such units as parameters to sample.

The following code constructs an Observable that emits the numbers between one and a million, and then samples that Observable every ten milliseconds to see what number it is emitting at that moment.

Sample Code

def numbers = Observable.range( 1, 1000000 );

numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
339707
547810
891282
Sequence complete

This variant of sample operates by default on the computation Scheduler, but you can optionally pass in a Scheduler of your choosing as a third parameter.

sample

There ia also a variant of sample (that does not have a throttleLast alias) that samples the source Observable each time a second Observable emits an item (or when it terminates). You pass in that second Observable as the parameter to sample.

This variant of sample does not by default operate on any particular Scheduler.

throttleFirst

There is also a throttleFirst operator, which differs from throttleLast/sample in that it emits the first item emitted by the source Observable in each sampling period rather than the most recently emitted item.

Sample Code

Scheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // deliver
o.onNext(2); // skip
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // deliver
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // skip
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onCompleted();
1
3
7
Sequence complete

throttleFirst operates by default on the computation Scheduler, but you can optionally pass in a Scheduler of your choosing as a third parameter.

RxJava implements this operator as sample and throttleLast.

Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period.

sample

One variant of sample (or its alias, throttleLast) samples at a periodic time interval that you choose by passing in a TimeUnit and a quantity of such units as parameters to sample.

This variant of sample operates by default on the computation Scheduler, but you can optionally pass in a Scheduler of your choosing as a third parameter.

sample

There ia also a variant of sample (that does not have a throttleLast alias) that samples the source Observable each time a second Observable emits an item (or when it terminates). You pass in that second Observable as the parameter to sample.

This variant of sample does not by default operate on any particular Scheduler.

throttleFirst

There is also a throttleFirst operator, which differs from throttleLast/sample in that it emits the first item emitted by the source Observable in each sampling period rather than the most recently emitted item.

throttleFirst operates by default on the computation Scheduler, but you can optionally pass in a Scheduler of your choosing as a third parameter.

RxJS implements this operator with two variants of sample.

sample

The first variant accepts as its parameter a periodicity, defined as an integer number of milliseconds, and it samples the source Observable periodically at that frequency.

Sample Code

var source = Rx.Observable.interval(1000)
    .sample(5000)
    .take(2);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 3
Next: 8
Completed
sample

The second variant accepts as its parameter an Observable, and it samples the source Observable whenever this second Observable emits an item.

Sample Code

var source = Rx.Observable.interval(1000)
    .sample(Rx.Observable.interval(5000))
    .take(2);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 3
Next: 8
Completed
throttleFirst

There is also a throttleFirst operator, which differs from sample in that it emits the first item emitted by the source Observable in each sampling period rather than the most recently emitted item.

It does not have the variant that uses the emissions from a second Observable to regulate the sampling periodicity.

Sample Code

var times = [
    { value: 0, time: 100 },
    { value: 1, time: 600 },
    { value: 2, time: 400 },
    { value: 3, time: 900 },
    { value: 4, time: 200 }
];

// Delay each item by time and project value;
var source = Rx.Observable.from(times)
  .flatMap(function (item) {
    return Rx.Observable
      .of(item.value)
      .delay(item.time);
  })
  .throttleFirst(300 /* ms */);

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 0
Next: 2
Next: 3
Completed

sample and throttleFirst operate by default on the timeout Scheduler. They are found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.time.js (requires rx.js or rx.compat.js)
  • rx.lite.js
  • rx.lite.compat.js

TBD