Отзывчивость, простота, гибкость и расширяемость кода — принципы, которые можно закрепить за реактивным UI.
Наверняка, если совместить реактивные backend и UI, то можно получить качественный продукт. Именно его мы и попытались сделать, разрабатывая 2GIS Dialer — звонилки, которая работает через API и при этом должна оставаться быстрой и удобной.
Зачем нам реактивное программирование
Рассмотрим пример:
requestDataTask = new AsyncTask<Void, Void, JSONObject>() { @Override protected JSONObject doInBackground(Void... params) { final String requestResult = apiService.getData(); final JSONObject json = JsonUtils.parse(requestResult); lruCache.cacheJson(json); return json; } };
Тут всё просто, мы создаем AsyncTask, в котором:
- Делаем запрос к API 2ГИС.
- Создаем
<code>JSONObject
на основе результата запроса. - Кэшируем
JSONObject
. - Возвращаем
JSONObject
.
Подобный код встречается во многих проектах, он понятен, а миллионы леммингов не могут ошибаться. Но давайте копнём чуть глубже:
- Что делать, если где-то во время выполнения выпал
Exception
? doInBackground(Void...)
выполняется в отдельном потоке, как нам сказать пользователю об ошибке в UI? Заводить поля дляException
?- А что возвращать, если не прошел запрос? null?
- А если json не валидный?
- Что стоит делать, если не удалось кэшировать объект?
И ведь это не самый сложный пример. Представьте, что вам надо сделать ещё один запрос на основе результатов предыдущего. На AsyncTask’ах это будет callback-hell, который, как минимум, будет неустойчив к падениям, ошибкам и т.д.
Вопросов больше, чем ответов. О недостатках AsyncTask’ов можно написать целую статью, серьезно. Но есть ли варианты лучше?
Фреймворк RxJava
Оглядываясь на принципы реактивного программирования мы начали искать решение, которое обеспечит:
- отсутствие зависаний и тормозов;
- масштабируемость на ресурсы смартфона;
- отсутствие крэшей;
- ориентированность на события.
Таковым стала RxJava от ребят из Netflix — reactive extension, идея (но не реализация) которого перекочевала из reactive extension for c#.
В RxJava всё крутится вокруг Observable
. Observable
— это как потоки данных (ещё их можно рассматривать как монады), которые могут каким-либо образом получать и отдавать эти самые данные. Над Observable
’ами можно применять операции, такие как flatmap
, filter
, zip
, merge
, cast
и т.д.
Простой пример:
//Observable, который последовательно будет давать нам элементы из Iterable final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)) .subscribeOn(Schedulers.newThread()) //отдаем новый тред для работы в background .observeOn(AndroidSchedulers.mainThread()) //говорим, что обсервить хотим в main thread .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { //do something with result } });
Мы создаем Observable
, который поочередно отдает нам числа из Iterable
. Указываем, что генерация и передача данных будет происходить в отдельном треде, а обработка результата — в main thread. Подписываемся на него, и в методе подписчика производим любые манипуляции с каждым следующим результатом.
Можно сделать этот пример более интересным:
//Observable, который последовательно будет давать нам элементы из Iterable final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)). //оператор фильтрации для отсеивания ненужных результатов filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; //выражение верно только для четных чисел } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { //do something with ONLY EVEN result } });
Теперь, указав оператор filter
, мы можем обрабатывать только чётные значения.
Как используют RxJava
Вернёмся к нашему первому AsyncTask и посмотрим, как бы мы решили задачу с помощью реактивного программирования.
Для начала создадим Observable с запросом:
//Observable, действия которого основанны на переданной ему Observable.OnSubscribe<String> Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //сообщить сабскрайберу о том, что есть новые данные subscriber.onNext(apiService.getData()); //А теперь сообщаем о том, что мы закончили и данных больше нет subscriber.onCompleted(); } });
Тут мы создаем Observable
и специфицируем его поведение. Делаем запрос и отдаем результат в onNext(...)
, после чего говорим Subscriber
’у, что мы закончили, вызвав onCompleted()
.
С этим понятно: мы создали Observalble
, который отвечает только за получение объекта String
с API. SRP в чистом виде.
Что, если запрос не прошёл по каким-то причинам? Тогда мы можем позвать у Observable
метод retry(...)
, который будет повторять этот самый Observable
n раз, пока он не завершится успешно (читай, без Exception
). Кроме того, мы можем отдать Observable
’у другой Observable
, если даже retry()
не помог. Если backend написан криво, то лучше бы нам закрывать соединение по таймауту. И у нас есть метод timeout(...)
на этот случай. Всё вместе это выглядело бы так:
final Subscription subscription = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(apiService.getData()); subscriber.onCompleted(); } }) .timeout(5, TimeUnit.SECONDS) //указываем таймаут операции в секундах .retry(3) // делаем 3 попытки запроса //назначаем обработчик в случае, если все таки мы не спасли положение .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() { @Override public Observable<? extends String> call(Throwable throwable) { //return new observable here, that can rescure us from error } });
И немного рефакторинга:
final Subscription subscription = createApiRequestObservable() //создали Observable с запросом .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут .retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов .onErrorResumeNext(createRequestErrorHandler()); // назначили обработчик ошибки
Теперь займемся созданием json. Для этого результат нашего первого Observable
(а там String
) надо преобразовать. Используем map(...)
, и, если что-то вдруг пойдет не так, вернем другой, нужный нам в случае неудачи, json с помощью onErrorReturn(...)
.
Вот так:
final Subscription subscription = createApiRequestObservable() .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) .retry(RETRY_COUNT_FOR_REQUEST) .onErrorResumeNext(createRequestErrorHandler()) //модифицируем Observable, чтобы тот преобразовывал String в JSONObject .map(new Func1<String, JSONObject>() { @Override public JSONObject call(String s) { return JsonUtils.parse(s); } }) //снова ставим обработчик ошибки //и вернем предустановленный "ошибочный" json .onErrorReturn(new Func1<Throwable, JSONObject>() { @Override public JSONObject call(Throwable throwable) { return jsonObjectForErrors; } });
Ок, с json разобрались. Осталось кэширование. Кэширование: это не преобразование результата, а действие над ним. Для этого у Observable
есть методы doOnNext(...)
, doOnEach(...)
и т.д. Получается как-то так:
final Subscription subscription = createApiRequestObservable() .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) .retry(RETRY_COUNT_FOR_REQUEST) .onErrorResumeNext(createRequestErrorHandler()) //модифицируем Observable, чтобы тот преобразовывал String в JSONObject .map(new Func1<String, JSONObject>() { @Override public JSONObject call(String s) { return JsonUtils.parse(s); } }) //снова ставим обработчик ошибки //и вернем предустановленный "ошибочный" json .onErrorReturn(new Func1<Throwable, JSONObject>() { @Override public JSONObject call(Throwable throwable) { return jsonObjectForErrors; } }) //процедура, вызывающаяся при каждом onNext(..) от Observable .doOnNext(new Action1<JSONObject>() { @Override public void call(JSONObject jsonObject) { lruCache.cacheJson(jsonObject); } });
Снова немного отрефакторим код:
final Subscription subscription = createApiRequestObservable() //создали Observable с запросом .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут .retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов .onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки .map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject .onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем .doOnNext(createCacheOperation()); //кэшируем JSONObject
Мы почти закончили. Как в самом первом примере с RxJava, добавим обработчик результата и укажем треды, в которых надо исполняться.
Финальная версия:
final Subscription subscription = createApiRequestObservable() //создали Observable с запросом .timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут .retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов .onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки .map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject .onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем .doOnNext(createCacheOperation()); //кэшируем JSONObject .subscribeOn(Schedulers.newThread()) //делаем запрос, преобразование, кэширование в отдельном потоке .observeOn(AndroidSchedulers.mainThread()) // обработка результата - в main thread .subscribe(subscriber); //обработчик результата
Давайте посмотрим, чего мы тут добились:
- Принцип отказоустойчивости в действии: результат выполнения всех операций всегда предсказуем. Мы знаем обо всех ошибках и потенциально проблемных местах, которые могут возникнуть в коде, и уже обработали их. Никаких исключений не будет.
- Принцип отзывчивости в действии: соединение с базой или сервером не зависнет благодаря таймауту, попытается сам восстановиться при ошибке и, что тоже важно, вернет результат сразу, до кэширования. А кэширование в
doOnNext
выполнится параллельно обработке результата. - Принцип ориентированности на события в действии: по ходу выполнения запроса и парсинга, мы всегда реагируем на события — события успешного/неуспешного завершения запроса, событие окончания парсинга json (2 реакции: обработка в UI и обработка в бэкграунд трэде для кэширования) и т.д. Кроме того, можно несколько раз подписываться на один
Observable
и держать в консистентном состоянии всю систему. - Код легко расширяем и почти не требует изменений. Если нам необходимо сделать логирование ошибки или сохранение стэктрейс, можно добавить метод
doOnError(Throwable thr)
. Хотите отфильтровать результаты — добавьте операторfilter
и реализуйте его поведение.
Как и недостатки AsyncTask’ов, преимущества этого подхода, на мой взгляд, можно перечислять очень долго. Последний из принципов реактивного программирования, принцип масштабируемости, продемонстрируем ниже.
RxJava в 2GIS Dialer
Живой пример:
//создаем новый Observable путем комбинирования четырех других final Observable<AggregatedContactData> obs = Observable.combineLatest( createContactsObservable(context), //Observable для получения контактов из базы createPhonesObservable(context), //Observable для получения всех телефонов контактов createAccountsObservable(context), //Observable для полуения аккаунтов и контактов по ним createJobsObservable(context), //Observable для получения мест работы контактов contactsInfoCombinator() //функция комбинироваия результатов всех Observable выше ).onErrorReturn(createEmptyObservable()).cache() //обработчик ошибки и оператор кэширования .subscribeOn(Schedulers.executor(ThreadPoolHolder.getGeneralExecutor())) //для выполнения такой задачи потребуется тред пул .observeOn(AndroidSchedulers.mainThread()); // обработка данных как всегда - в main thread
- Тут происходит сразу много интересного и посложнее описанного выше:
Первое, что бросается в глаза, этоObservable.combineLatest(...)
. Этот оператор ждетonNext(...)
от всех переданных емуObservable
’ов и применяет функцию комбинирования сразу ко всем результатам. Может показаться сложным, но картинка из вики RxJava сделает всё понятнее. Самое важное тут, что каждый изObservable
, переданных вObservable.combineLatest(...)
— этоCursorObservable
, который передает в свойonNext(...)
новый курсор, как только он меняется в базе данных. Таким образом, на любое обновление любого из четырех курсоров выполняется функция комбинирования, что позволяет всегда поставлять самые свежие данные. Это и есть принцип ориентированности на события. - Если что-то пошло не так, то мы исходя из своих нужд возвращаем требуемое. В данном случае
Collections.emptyList();
- Оператор
cache()
очень полезен, если на этотObservable
могут быть подписаны сразу несколькоSubscribers
. Если этот оператор применен кObservable
, то новый его подписчик мгновенно получает данные, при условии, что эти данные уже были посчитаны для подписавшихся ранее. Таким образом, все желающие имеет актуальные одинаковые данные. - А вот тут видно принцип масштабируемости: в
subscribeOn(...)
я отдаю тред пул на 4 треда, чтобы каждый из 4х моихObservable
выполнялся в отдельном треде с целью максимизации скорости, всю остальную заботу берет на себя RxJava. То есть задействованы будут все 4 процессора, при наличие оных.
Как видите, потенциал у реактивного программирования огромный, а фукнционал RxJava реализует его в достаточной мере.
Проблемы, с которыми мы столкнулись
Всё, продемонстрированное выше и намного больше в том или ином виде используется у нас в дайлере. И вот с какими проблемами мы столкнулись:
- Проблема OOM. Наивно полагать, что Android может дать много тредов для многопоточной работы. При количестве тредов больше 15, даже топовые смартфоны начинали “задумываться”, а их мелкие собратья и вовсе падали с
OutOfMemoryError
. Решение было простым. ВвестиCachedThreadPool
для этих дел и проблема решена. - Кэширование запросов. Речь не про оператор
cache()
из примера выше. Хотелось бы, чтобы следующий запрос на тот же самый url сразу брался из кэша. В RxJava такого нет. В принципе это правильно, потому что реактивность и кэш — две разные вещи. Поэтому мы написали свой кэш.
Что еще?
Мы увидели, как классно реактивно работать с многопоточностью и запросами в Android. Но это далеко не всё. Например, можно подписываться на изменение Checkable
или EditText
(это из коробки идет в RxJava для Android). Тут всё просто, но ужасно удобно.
Кстати, одной RxJava реактивное программирование под Java не ограничивается. Существуют и другие библиотеки, например, Bolts-Android.
Кроме этого, сейчас активно разрабатывается Reactive-Streams, который призван унифицировать работу с разными реактивными провайдерами в java.
Вывод
Понравилось ли нам? Однозначно. Реактивные приложения действительно гораздо устойчивее к багам и падениям, код становится понятным и гибким (были бы лямбды — был бы еще и красивым). Много рутинной работы перекладывается на библиотеку, которая выполняет свою работу лучше, чем нативные Android-компоненты. Это позволяет сосредоточиться на реализации вещей, которые действительно стоит обдумать.
Реактивное программирование — это немного другое мышление по сравнению с традиционной разработкой под Android. Потоки данных, функциональные операторы — эти сложные, на первый взгляд, вещи оказываются намного проще, если разобраться. Стоит немного поработать с реактивным программированием, и мозг начинает перестраиваться с объектов и состояний на монады и операторы над ними. Это такая большая, добрая, мощная частичка ФП в ООП, которая делает жизнь и код проще, а приложение лучше. Попробуйте, не пожалеете.
Ссылки, которые помогут вам разобраться с реактивным программированием или просто могут оказаться интересными:
- Википедия RxJava.
- Очень толковая статья от ведущего программиста SoundHound и евангелиста RxJava под Android.
- Реактивный манифест.
- Reactive-Streams.
Небольшое отступление. Если вы разделяете наши взгляды на программирование и создание продуктов, то приходите — будем рады вас видеть в команде 2GIS Dialer.
ссылка на оригинал статьи http://habrahabr.ru/company/2gis/blog/228125/
Добавить комментарий