Rx. Постигаем retryWhen и repeatWhen на примерах из Android разработки

от автора

В сети очень много русско- и англоязычных статей по Rx операторам retryWhen и repeatWhen.
Несмотря на это, очень часто встречаю нежелание их использовать (ввиду сложного синтаксиса и непонятных диаграмм).

Приведу несколько как можно с их помощью эффективно перезапускать участки цепи и делегировать обработку перезапусков при ошибках и завершениях потока.

В примерах будет Java код с лямбдами (Retrolamda), но переписать его на Kotlin или чистую Java не составит труда.

Императивный способ перезапуска цепи

Предположим, мы используем Retrofit и загрузку начинаем в методе load(). Repository.getSomething() возвращает Single<Something>().

@NonNull private Subscription loadingSubscription = Subscriptions.unsubscribed();  private void load() {     subscription.unsubscribe();     subscription = repository              .getSomething()              .subscribe(result -> {}, err -> {}); }  private void update() {     load(); } 

Из какого-нибудь листенера обновлений (e.g. PullToRefreshView) мы вызываем метод update(), который, в свою очередь, вызовет метод load(), где с нуля будет создана подписка.

Я предпочитаю ко вниманию вариант использования более реактивного, на мой взгляд, способа с вышеупомянутым оператором repeatWhen().

Реактивный способ перезапуска цепи — repeatWhen

Создадим объект PublishSubject updateSubject, и передадим в оператор лямбду
repeatHandler -> repeatHandler.flatMap(nothing -> updateSubject.asObservable())

@NonNull private final PublishSubject<Void> updateSubject = PublishSubject.create();  private void load() {     repository             .getSomething()             .repeatWhen(repeatHandler ->                                 repeatHandler.flatMap(nothing -> updateSubject.asObservable()))             .subscribe(result -> {}, err -> {}); } 

Теперь для обновления загруженных данных нужно заэмитить null в updateSubject.

private void update() {     updateSubject.onNext(null); } 

Нужно помнить, что работает такой реактивный способ только с Single, который вызывает onComplete() сразу после эмита единственного элемента (будет работать и с Observable, но только после завершения потока).

Реактивный способ обработки ошибок retryWhen

Подобным образом можно обрабатывать и ошибки. Предположим, у пользователя пропала сеть, что приведет к ошибке и вызову onError() внутри Single, который возвращается методом getUser().

В этот момент можно показать пользователю диалог с текстом «Проверьте соединение», а по нажатию кнопки OK вызвать метод retry().

@NonNull private final PublishSubject<Void> retrySubject = PublishSubject.create();  private void load() {     repository             .getSomething()             .doOnError(err -> showConnectionDialog())             .retryWhen(retryHandler -> retryHandler.flatMap(nothing -> retrySubject.asObservable()))             .subscribe(result -> {}, err -> {}); }  private void retry() {     retrySubject.onNext(null); } 

По вызову retrySubject.onNext(null) вся цепочка выше retryWhen() переподпишется к источнику getUser(), и повторит запрос.

При таком подходе важно помнить, что doOnError() должен находиться выше в цепочке, чем retryWhen(), поскольку последний «поглощает» ошибки до эмита repeatHandler’а.

В данном конкретном случае выигрыша по производительности не будет, а кода стало даже чуть больше, но эти примеры помогут начать мыслить реактивными паттернами.

В следующем, бессовестно притянутом за уши, примере, в методе load() мы объединяем два источника оператором combineLatest.

Первый источник — repository.getSomething() загружает что-то из сети, второй, localStorage.fetchSomethingReallyHuge(), загружает что-то тяжелое из локального хранилища.

public void load() {     Observable.combineLatest(repository.getSomething(),                              localStorage.fetchSomethingReallyHuge(),                              (something, hugeObject) -> new Stuff(something, hugeObject))             .subscribe(stuff -> {}, err -> {}); } 

При обработке ошибки императивным способом, вызывая load() на каждую ошибку, мы будем заново подписываться на оба источника, что, в данном примере, абсолютно ненужно. При сетевой ошибке, второй источник успешно заэмитит данные, ошибка произойдет только в первом. В этом случае императивный способ будет еще и медленней.

Посмотрим, как будет выглядеть реактивный способ.

 public void load() {     Observable.combineLatest(             repository.getSomething()                     .retryWhen(retryHandler ->                                        retryHandler.flatMap(                                                err -> retrySubject.asObservable())),             localStorage.fetchSomethingReallyHuge()                     .retryWhen(retryHandler ->                                        retryHandler.flatMap(                                                nothing -> retrySubject.asObservable())),             (something, hugeObject) -> new Stuff(something, hugeObject))             .subscribe(stuff -> {}, err -> {}); } 

Прелесть такого подхода в том, что лямбда, переданная в оператор retryWhen() исполняется только после ошибки внутри источника, соответственно, если «ошибется» только один из источников, то и переподписка произойдет только на него, а оставшаяся цепочка ниже будет ожидать переисполнения.

А если ошибка произойдет внутри обоих источников, то один и тот же retryHandler сработает в двух местах.

Делегирование обработки ошибок

Следующим шагом можно делегировать обработку повторов некоему RetryManager. Перед этим еще можно немного подготовиться к переезду на Rx2 и убрать из наших потоков null объекты, которые запрещены в Rx2. Для этого можно создать класс:

public class RetryEvent { } 

Без ничего. Позже туда можно будет добавлять разные флаги, но это другая история. Интерфейс RetryManager может выглядеть как-то так:

interface RetryManager {      Observable<RetryEvent> observeRetries(@NonNull Throwable error);  } 

Реализация может проверять ошибки, показывать диалоги, снэкбар, устанавливать бесшумный таймаут — всё, что душе угодно. И слушать коллбэки от всех этих UI компонентов, чтобы в последствии заэмитить RetryEvent в наш retryHandler.

Предыдущий пример с использованием такого RetryManager будет выглядеть вот так:

//pass this through constructor, DI or use singleton (but please don't) private final RetryManager retryManager;  public void load() {     Observable.combineLatest(             repository.getSomething()                     .retryWhen(retryHandler ->                                        retryHandler.flatMap(                                                err -> retryManager.observeRetries())),             localStorage.fetchSomethingReallyHuge()                     .retryWhen(retryHandler ->                                        retryHandler.flatMap(                                                nothing -> retryManager.observeRetries())),             (something, hugeObject) -> new Stuff(something, hugeObject))             .subscribe(stuff -> {}, err -> {}); }  

Таким нехитрым образом обработка повторов при ошибках делегирована сторонней сущности, которую можно передавать как зависимость.

Надеюсь, эти примеры окажутся кому-то полезны и соблазнят попробовать repeatWhen() и retryWhen() в своих проектах.
ссылка на оригинал статьи https://habrahabr.ru/post/326890/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *