Retry

if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error

Retry

The Retry operator responds to an onError notification from the source Observable by not passing that call through to its observers, but instead by resubscribing to the source Observable and giving it another opportunity to complete its sequence without error. Retry always passes onNext notifications through to its observers, even from sequences that terminate with an error, so this can cause duplicate emissions (as shown in the diagram above).

See Also

Language-Specific Information:

RxClojure does not implement the Retry operator.

RxCpp implements this operator as retry:

retry

retry takes a single argument, a count of the number of times it should try resubscribing to the source Observable when it encounters errors. If this count is exceeded, retry will not attempt to resubscribe and will instead pass the onError notification to its observers.

RxGroovy has two versions of this operator: retry and retryWhen.

retry

One variant of retry takes no parameters. It will continue to resubscribe to and mirror the source Observable no matter how many onError notifications it receives.

Another variant of retry takes a single parameter: a count of the number of times it should try to resubscribe to the source Observable when it encounters errors. If this count is exceeded, retry will not attempt to resubscribe again and will instead pass the latest onError notification to its observers.

A third variant of retry takes a predicate function as a parameter. You write this function to accept two arguments: an Integer count of how many retries have taken place thus far, and a Throwable indicating the error that caused the onError notification. This function returns a Boolean to indicate whether or not retry should resubscribe to and mirror the source Observable. If it does not, then retry passes the latest onError notification to its observers.

retry by default operates on the trampoline Scheduler.

retryWhen

The retryWhen operator is similar to retry but decides whether or not to resubscribe to and mirror the source Observable by passing the Throwable from the onError notification to a function that generates a second Observable, and observes its result to determine what to do. If that result is an emitted item, retryWhen resubscribes to and mirrors the source and the process repeats; if that result is an onError notification, retryWhen passes this notification on to its observers and terminates.

retryWhen by default operates on the trampoline Scheduler, and there is also a version that accepts a Scheduler as a parameter.

RxJava has two versions of this operator: retry and retryWhen.

retry

One variant of retry takes no parameters. It will continue to resubscribe to and mirror the source Observable no matter how many onError notifications it receives.

Another variant of retry takes a single parameter: a count of the number of times it should try to resubscribe to the source Observable when it encounters errors. If this count is exceeded, retry will not attempt to resubscribe again and will instead pass the latest onError notification to its observers.

A third variant of retry takes a predicate function as a parameter. You write this function to accept two arguments: an Integer count of how many retries have taken place thus far, and a Throwable indicating the error that caused the onError notification. This function returns a Boolean to indicate whether or not retry should resubscribe to and mirror the source Observable. If it does not, then retry passes the latest onError notification to its observers.

retry by default operates on the trampoline Scheduler.

retryWhen

The retryWhen operator is similar to retry but decides whether or not to resubscribe to and mirror the source Observable by passing the Throwable from the onError notification to a function that generates a second Observable, and observes its result to determine what to do. If that result is an emitted item, retryWhen resubscribes to and mirrors the source and the process repeats; if that result is an onError notification, retryWhen passes this notification on to its observers and terminates.

retryWhen by default operates on the trampoline Scheduler, and there is also a version that accepts a Scheduler as a parameter.

Sample Code

Observable.create((Subscriber<? super String> s) -> {
      System.out.println("subscribing");
      s.onError(new RuntimeException("always fails"));
  }).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
          System.out.println("delay retry by " + i + " second(s)");
          return Observable.timer(i, TimeUnit.SECONDS);
      });
  }).toBlocking().forEach(System.out::println);
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing

RxJS implements this operator as retry.

retry

One variant of retry takes no parameters. It will continue to resubscribe to and mirror the source Observable no matter how many onError notifications it receives.

Another variant of retry takes a single parameter: a count of the number of onError notification it should be willing to accept before it too fails and passes the onError to its observers. For example, retry(2) means that retry will resubscribe to and mirror the source Observable the first time it receives an onError notification, but will terminate with an error the second time this happens.

retry is found in the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxKotlin has two versions of this operator: retry and retryWhen.

retry

One variant of retry takes no parameters. It will continue to resubscribe to and mirror the source Observable no matter how many onError notifications it receives.

Another variant of retry takes a single parameter: a count of the number of times it should try to resubscribe to the source Observable when it encounters errors. If this count is exceeded, retry will not attempt to resubscribe again and will instead pass the latest onError notification to its observers.

A third variant of retry takes a predicate function as a parameter. You write this function to accept two arguments: an Integer count of how many retries have taken place thus far, and a Throwable indicating the error that caused the onError notification. This function returns a Boolean to indicate whether or not retry should resubscribe to and mirror the source Observable. If it does not, then retry passes the latest onError notification to its observers.

retryWhen

The retryWhen operator is similar to retry but decides whether or not to resubscribe to and mirror the source Observable by passing the Throwable from the onError notification to a function that generates a second Observable, and observes its result to determine what to do. If that result is an emitted item, retryWhen resubscribes to and mirrors the source and the process repeats; if that result is an onError notification, retryWhen passes this notification on to its observers and terminates.

Rx.NET implements this operator as Retry.

Retry

One variant of Retry takes no parameters. It will continue to resubscribe to and mirror the source Observable no matter how many onError notifications it receives.

Another variant of Retry takes a single parameter: a count of the number of onError notification it should be willing to accept before it too fails and passes the onError to its observers. For example, Retry(2) means that Retry will resubscribe to and mirror the source Observable the first time it receives an onError notification, but will terminate with an error the second time this happens.

RxPHP implements this operator as retry.

Repeats the source observable sequence the specified number of times or until it successfully terminates. If the retry count is not specified, it retries indefinitely. Note if you encounter an error and want it to retry once, then you must use ->retry(2).

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/retry/retry.php

$loop      = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$count = 0;

$observable = Rx\Observable::interval(1000, $scheduler)
    ->flatMap(function ($x) use (&$count) {
        if (++$count < 2) {
            return Rx\Observable::error(new \Exception("Something"));
        }
        return Rx\Observable::just(42);
    })
    ->retry(3)
    ->take(1);

$observable->subscribe($stdoutObserver);

$loop->run();

   
Next value: 42
Complete!
    

RxPHP also has an operator retryWhen.

Repeats the source observable sequence on error when the notifier emits a next value. If the source observable errors and the notifier completes, it will complete the source sequence.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/retry/retryWhen.php

$loop      = new \React\EventLoop\StreamSelectLoop();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

$source = Rx\Observable::interval(1000)
    ->map(function ($n) {
        if ($n === 2) {
            throw new Exception();
        }
        return $n;
    })
    ->retryWhen(function ($errors) {
        return $errors->delay(200);
    })
    ->take(6);

$subscription = $source->subscribe($createStdoutObserver(), $scheduler);

$loop->run();

   
Next value: 0
Next value: 1
Next value: 0
Next value: 1
Next value: 0
Next value: 1
Complete!
    

RxPY implements this operator as retry:

retry

retry takes a single optional parameter, a count of the number of times it should try resubscribing to and mirroring the source Observable when it encounters errors. If this count is exceeded, retry will not attempt to resubscribe and will instead pass the onError notification to its observers. If you omit this parameter, retry will attempt to resubscribe and mirror indefinitely, no matter how many onError notifications it receives.

Rx.rb has two versions of this operator: retry and retry_infinitely.

retry

retry takes a single optional parameter, a count of the number of times it should try resubscribing to and mirroring the source Observable when it encounters errors. If this count is exceeded, retry will not attempt to resubscribe and will instead pass the onError notification to its observers.

retryInfinitely, on the other hand, will attempt to resubscribe to and mirror the source Observable indefinitely, no matter how many onError notifications it receives.

RxScala has two versions of this operator: retry and retryWhen.

retry

One variant of retry takes no parameters. It will continue to resubscribe to and mirror the source Observable no matter how many onError notifications it receives.

Another variant of retry takes a single parameter: a count of the number of times it should try to resubscribe to the source Observable when it encounters errors. If this count is exceeded, retry will not attempt to resubscribe again and will instead pass the latest onError notification to its observers.

A third variant of retry takes a predicate function as a parameter. You write this function to accept two arguments: an Int count of how many retries have taken place thus far, and a Throwable indicating the error that caused the onError notification. This function returns a Boolean to indicate whether or not retry should resubscribe to and mirror the source Observable. If it does not, then retry passes the latest onError notification to its observers.

retryWhen

The retryWhen operator is similar to retry but decides whether or not to resubscribe to and mirror the source Observable by passing the Throwable from the onError notification to a function that generates a second Observable, and observes its result to determine what to do. If that result is an emitted item, retryWhen resubscribes to and mirrors the source and the process repeats; if that result is an onError notification, retryWhen passes this notification on to its observers and terminates.