Filter

emit only those items from an Observable that pass a predicate test

The Filter operator filters an Observable by only allowing items through that pass a test that you specify in the form of a predicate function.

See Also

Language-Specific Information:

filter

RxGroovy implements this operator as filter. You can filter an Observable, discarding any items that do not meet some test, by passing a filtering function into the filter operator. For example, the following code filters a list of integers, emitting only those that are even (that is, where the remainder from dividing the number by two is zero):

Sample Code

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

numbers.filter({ 0 == (it % 2) }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
2
4
6
8
Sequence complete

filter does not by default operate on any particular Scheduler.

ofType

There is also a specialized form of the Filter operator in RxGroovy that filters an Observable so that it only emits items of a particular class.

ofType does not by default operate on any particular Scheduler.

filter

RxJava implements this operator as filter.

Sample Code

Observable.just(1, 2, 3, 4, 5)
          .filter(new Func1<Integer, Boolean>() {
              @Override
              public Boolean call(Integer item) {
                return( item < 4 );
              }
          }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
Next: 1
Next: 2
Next: 3
Sequence complete.

filter does not by default operate on any particular Scheduler.

ofType

There is also a specialized form of the Filter operator in RxGroovy that filters an Observable so that it only emits items of a particular class.

ofType does not by default operate on any particular Scheduler.

filter

RxJS implements this operator under two names, but with identical behavior: filter and where. This operator takes two parameters: the predicate function, and an optional object that will represent that function’s “this” context when it executes.

The predicate function itself takes three arguments:

  1. the item from the source Observable to be, or not be, filtered
  2. the zero-based index of this item in the source Observable’s sequence
  3. the source Observable object

Write the predicate function so that it returns true for those items you want to pass through the filter to the next observer, and false for those items you want the filter to block and suppress.

Sample Code

var source = Rx.Observable.range(0, 5)
  .filter(function (x, idx, obs) {
    return x % 2 === 0;
  });

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 0
Next: 2
Next: 4
Completed

filter and where are found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP implements this operator as filter.

Emit only those items from an Observable that pass a predicate test.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/filter/filter.php

$observable = Rx\Observable::fromArray([21, 42, 84]);
$observable
    ->filter(function ($elem) {
        return $elem >= 42;
    })
    ->subscribe($stdoutObserver);

   
Next value: 42
Next value: 84
Complete!
    

RxPHP also has an operator where.

Alias for filter