Введение в RxJava, часть первая – Вступление: Жизненный цикл подписки

от автора

image
Одна из главных идей, лежащих в основе Rx, заключается в том, что неизвестно когда именно последовательность выдаст новое значение или завершится. Однако, у нас есть возможность управлять временем в которое мы начнем или закончим получать эти значения. К тому же, если наши подписчики используют внешние ресурсы, то мы вероятно захотим освободить их по окончанию некой последовательности.

Содержание

Содержание:

  • Часть первая – Вступление
    1. Почему Rx?
    2. Ключевые типы
    3. Жизненный цикл подписки
  • Часть вторая – Последовательности
    1. Создание последовательности
    2. Фильтрация последовательности
    3. Исследование
    4. Агрегация
    5. Трансформация
  • Часть третья – Управление последовательностями
  • Часть четвертая – Параллельность

Подписка

Существует несколько перегруженных методов Observable::subscrube, выполняющих одну и ту же функцию

Subscription    subscribe() Subscription    subscribe(Action1<? super T> onNext) Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError) Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete) Subscription    subscribe(Observer<? super T> observer) Subscription    subscribe(Subscriber<? super T> subscriber) 

subscribe() поглощает события, но сам по себе не выполняет непосредственных действий. Его перегруженные версии, имеющие хотя бы один параметр типа Action, создают объект Subscriber. Если не передать функции для событий onError и onCompleted, они попросту проигнорируются.

Subject<Integer, Integer> s = ReplaySubject.create(); s.subscribe(     v -> System.out.println(v),     e -> System.err.println(e)); s.onNext(0); s.onError(new Exception("Oops")); 

Вывод

0 java.lang.Exception: Oops 

Если мы не передать функцию для обработки ошибок, будет выброшено OnErrorNotImplementedException в месте, где на стороне провайдера вызван s.onError. В данном случае, провайдер[1] и потребитель[2] находятся в одном блоке кода, что позволяет использовать традиционный try-catch. Однако в реальности, провайдер и продюсер могут находится в разных местах. В таком случае, если потребитель не предоставит функцию для обработки ошибок, он никогда не узнает когда и по какой причине завершилась последовательность.

Отписка

Вы можете перестать получать данные еще до того как последовательность завершится. Все перегрузки метода subscribe возвращают объект интерфейса Subscribtion, который имеет 2 метода:

boolean isUnsubscribed() void unsubscribe() 

Вызов unsubscribe остановит поступление событий в observer.

Subject<Integer, Integer>  values = ReplaySubject.create(); Subscription subscription = values.subscribe(     v -> System.out.println(v),     e -> System.err.println(e),     () -> System.out.println("Done") ); values.onNext(0); values.onNext(1); subscription.unsubscribe(); values.onNext(2); 

Вывод

0 1 

Отписав одного подписчика, мы никак не повлияем на других подписчиков этого же ovbservable.

Subject<Integer, Integer>  values = ReplaySubject.create(); Subscription subscription1 = values.subscribe(     v -> System.out.println("First: " + v) ); Subscription subscription2 = values.subscribe(     v -> System.out.println("Second: " + v) ); values.onNext(0); values.onNext(1); subscription1.unsubscribe(); System.out.println("Unsubscribed first"); values.onNext(2); 

Вывод

First: 0 Second: 0 First: 1 Second: 1 Unsubscribed first Second: 2 

onError и onCompleted

onError и onCompleted означают завершение последовательности. Добросовестный observable, который следует контрактам Rx перестанет выдавать значения после наступления одного из этих событий. Это то, что следует всегда помнить при создании собственного Observable.

Subject<Integer, Integer>  values = ReplaySubject.create(); Subscription subscription1 = values.subscribe(     v -> System.out.println("First: " + v),     e -> System.out.println("First: " + e),     () -> System.out.println("Completed") ); values.onNext(0); values.onNext(1); values.onCompleted(); values.onNext(2); 

Вывод

First: 0 First: 1 Completed 

Освобождение ресурсов

Подписка удерживает в памяти ресурсы, с которыми связана. Эти ресурсы не будут автоматически освобождены при выходе объекта Subscription из области видимости. Если после вызова метода subscribe проигнорировать возвращаемое значение, то существует риск потерять единственную возможность отписаться. Подписка будет существовать далее, в то время как доступ к ней будет потерян, что может привести к утечке памяти и нежелательным действиям.

Существуют исключения. Например, при вызове перегруженных методов subscribe, неявно конструирующих объект Subscriber, будет создан механизм, который автоматически отвяжет подписчиков в момент когда завершится последовательность. Однако, даже в таком случае, следует помнить о бесконечных последовательностях. Вам все равно будет необходим объект Subscription, чтобы в какой-то момент прекратить получать данные от них.

Существует несколько реализаций интерфейса Subscription:

  • BooleanSubscription
  • CompositeSubscription
  • MultipleAssignmentSubscription
  • RefCountSubscription
  • SafeSubscriber
  • Scheduler.Worker
  • SerializedSubscriber
  • SerialSubscription
  • Subscriber
  • TestSubscriber

Мы еще встретимся с ними в будущих статьях. Стоит отметить, что Subscriber тоже реализует Subscription, что означает, что у нас также есть возможность отписаться используя ссылку на объект Subscriber.

С пониманием жизненного цикла подписок, вы получаете контроль над связанными с ними ресурсами. Это позволит вашей программе быть более предсказуемой, легко поддерживаемой и расширяемой и, надеемся, менее подверженной багам.

В следующией части мы научимся создавать и обрабатывать последовательности используя мощный инструментарий библиотеки.


[1] Тот, кто управляет (создает) observable – Примеч.
[2] Тот, кто использует значения, выданные observable – Примеч.

ссылка на оригинал статьи http://habrahabr.ru/post/270975/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *