Создание своего Publisher’a в Combine

от автора

Сегодня я хотел бы показать вам, как создать свой собственный Publisher в новом фреймворке от Apple Combine.

И так для начала нам нужно кратко вспомнить, как взаимодействуют между собой фундаментальные части Combine, а именно Publisher, Subscription, Subscriber.

  • Subscriber присоединяется к Publisher’у
  • Publisher отправляет Subscription Subscriber’у
  • Subscriber запрашивает N значений у Subscription
  • Publisher отправляет N значений или меньше
  • Publisher отправляет сигнал о завершении

Publisher

Что же приступим созданию нашего Publisher’a. Если обратиться к документации Apple, то мы увидим, что Publisher это протокол.

public protocol Publisher {      associatedtype Output     associatedtype Failure : Error      func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input }

Где Output — тип передаваемых значений этим Publisher’ом, Failure — тип ошибки, который должен следовать протоколу Error.
И функция receive(_: Subscriber), которая будет вызвана для добавления Subscriber’a к этому Publisher’у с помощью subscribe(_:).

В качестве примера реализуем Publisher, который будет генерировать для нас числа Фибоначчи.

struct FibonacciPublisher: Publisher {       typealias Output = Int     typealias Failure = Never }

Так как последовательность состоит из чисел, то в качестве выдаваемого значения Output будет тип Int, в качестве Failure будет особый тип Never, говорящий о том, что данный Publisher никогда не завершится с ошибкой.

Для гибкости укажем предельное количество элементов, которые мы хотим получить и обернем это значение в некий объект конфигурации нашего Publisher.

struct FibonacciConfiguration {     var count: UInt }

Рассмотрим этот код более внимательно, var count: UInt выглядит неплохим вариантом, но его использование ограничивает нас областью допустимых значений типа UInt и так же не совсем понятно, что указать, если мы хотим все же иметь неограниченную последовательность.
Вместо UInt воспользуемся типом Subscribers.Demand, который определен в Combine, там же он описывается как тип, который посылается от Subscriber’a к Publisher’у через Subscription. Если говорить по простому, то он показывает потребность в элементах, сколько элементов запрашивает Subscriber. unlimited — не ограничено, none — нисколько, max(N) — не больше N раз.

    public struct Demand : Equatable, Comparable, Hashable, Codable, CustomStringConvertible {         public static let unlimited: Subscribers.Demand         public static let none: Subscribers.Demand /// Эквивалент Demand.max(0)         @inlinable public static func max(_ value: Int) -> Subscribers.Demand         ....     }

Перепишем FibonacciConfiguration поменяв тип на новый у count.

struct FibonacciConfiguration {     var count: Subscribers.Demand }

Вернемся к Publisher’у и реализуем метод receive(_: Subscriber), как мы помним этот метод нужен для того, чтобы добавить Subscriber к Publisher’у. И делает он это с помощью подписки Subscription, Publisher должен создать подписку и передать эту подписку подписчику.

    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {         let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration)         subscriber.receive(subscription: subscription)     }

Это дженерик функция, которая принимает в качестве параметра Subscriber, причем выходные значения Publisher’a должны соответствовать входным значениям Subscriber’a (Output == S.Input), тоже самое и для ошибок. Это необходимо для «соединения» Publisher’a и Subscriber’a.
В самой функции создаем подписку FibonacciSubscription, в конструкторе передаем подписчика и конфигурацию. После этого подписка передается подписчику.

Наш Publisher готов, в итоге имеем:

struct FibonacciPublisher: Publisher {      typealias Output = Int     typealias Failure = Never      var configuration: FibonacciConfiguration      func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {         let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration)         subscriber.receive(subscription: subscription)     } }

Как вы видите сам Publisher не содержит никакой логики по генерации последовательности Фибоначчи, вся логика будет в классе подписки — FibonacciSubscription.

Как вы уже догадываетесь, класс FibonacciSubscription будет следовать протоколу Subscription, посмотрим на определение этого протокола.

public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {     func request(_ demand: Subscribers.Demand) }

Функция request(_: Subscribers.Demand) сообщает Publisher’у, что он может отправить больше значений подписчику. Именно в этом методе и будет логика отправки чисел Фибоначчии.
Так же нам нужно реализовать следование протоколу Cancellable и реализовать функцию cancel().

public protocol Cancellable {     func cancel() }

И так же нужно следовать протоколу CustomCombineIdentifierConvertible и определить read-only переменную combineIdentifier.

public protocol CustomCombineIdentifierConvertible {     var combineIdentifier: CombineIdentifier { get } }

Тут есть уточнение, если прокрутить чуть ниже определения протокола CustomCombineIdentifierConvertible в Combine, то можно увидеть, что Combine предоставляет расширение для этого протокола, которое имеет вид —

extension CustomCombineIdentifierConvertible where Self : AnyObject {     public var combineIdentifier: CombineIdentifier { get } }

Что говорит нам о том, что определение переменной combineIdentifier: CombineIdentifier предоставляется по умолчанию, если тип, который следуют этому протоколу, так же следует протоколу AnyObject, а именно если этот тип класс. FibonacciSubscription — класс, поэтому мы получаем определение переменной по умолчанию.

Subscription

И так начнем реализовывать наш FibonacciSubscription.

private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {      var subscriber: S?     var configuration: FibonacciConfiguration     var count: Subscribers.Demand      init(subscriber: S?, configuration: FibonacciConfiguration) {         self.subscriber = subscriber         self.configuration = configuration         self.count = configuration.count     }      ... }

Как можно видеть FibonacciConfiguration содержит сильную ссылку на Subscriber, иначе говоря является владельцем Subscriber. Это важный момент, подписка ответственна за удержание подписчика, причем удерживать его она должна до тех пор, пока он не завершит работу, не завершится с ошибкой или с отменной.

Далее реализуем метод cancel() из протокола Cancellable.

func cancel() {     subscriber = nil }

Установка subscriber в nil делает его недоступным подписке.

Теперь мы готовы приступить к самой реализации отправки чисел Фибоначчи.
реализуем метод request(_: Subscribers.Demand).

func request(_ demand: Subscribers.Demand) {         // 1         guard count > .none else {             subscriber?.receive(completion: .finished)             return         }         // 2         count -= .max(1)         subscriber?.receive(0)         if count == .none {             subscriber?.receive(completion: .finished)             return         }         // 3         count -= .max(1)         subscriber?.receive(1)         if count == .none {             subscriber?.receive(completion: .finished)             return         }         // 4         var prev = 0         var current = 1         var temp: Int         while true {             temp = prev             prev = current             current += temp             subscriber?.receive(current)             count -= .max(1)             if count == .none {                 subscriber?.receive(completion: .finished)                 return             }         }     }

1) С начала мы проверяем, сколько элементов может нам предоставить Publisher, если нисколько, то завершаем отправку и посылаем Subscriber’у сигнал о завершении отправки чисел.
2) Если потребность есть, то уменьшаем на единицу общее количество запрашиваемых чисел, отправляем Subscriber’у первый элемент последовательности Фибоначчи, а именно 0 и далее опять проверяем сколько еще элементов может нам предоставить Publisher, если нисколько, то отправляем Subscriber’у сигнал о завершении.
3) Такой же подход, как и во 2) пункте, но только для второго элемента в последовательности Фибоначии.
4) Если требуется больше, чем 2 элемента, то мы реализуем итеративный алгоритм нахождения чисел Фибоначчи, где на каждом шаге будем передавать очередное число из последовательности Фибоначии Subscriber’y и так же проверять сколько элементов еще может предоставить Publisher. Если Publisher больше не предоставляет новые числа, то отправляем Subscriber’у сигнал о завершении.

На данный момент мы написали такой код

struct FibonacciConfiguration {     var count: Subscribers.Demand }  struct FibonacciPublisher: Publisher {      typealias Output = Int     typealias Failure = Never      var configuration: FibonacciConfiguration      func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {         let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration)         subscriber.receive(subscription: subscription)     }  }  private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {      var subscriber: S?     var configuration: FibonacciConfiguration     var count: Subscribers.Demand      init(subscriber: S?, configuration: FibonacciConfiguration) {         self.subscriber = subscriber         self.configuration = configuration         self.count = configuration.count     }      func cancel() {         subscriber = nil     }      func request(_ demand: Subscribers.Demand) {         // 1         guard count > .none else {             subscriber?.receive(completion: .finished)             return         }         // 2         count -= .max(1)         subscriber?.receive(0)         if count == .none {             subscriber?.receive(completion: .finished)             return         }         // 3         count -= .max(1)         subscriber?.receive(1)         if count == .none {             subscriber?.receive(completion: .finished)             return         }         // 4         var prev = 0         var current = 1         var temp: Int         while true {             temp = prev             prev = current             current += temp             subscriber?.receive(current)             count -= .max(1)             if count == .none {                 subscriber?.receive(completion: .finished)                 return             }         }     } }

Первое тестирование

Теперь протестируем, что у нас получилось, свой Publisher и Subscription у нас есть, не хватает Sibscriber’a, Combine предоставляет 2 Sibscriber’a из коробки это sink и assign.

  • sink — этот метод создает подписчика и сразу запрашивает неограниченное число значений.
  • assign — устанавливает каждый элемент от Publisher’a к свойству объекта.

sink хорошо подходит для нашей цели, особое внимание нужно обратить на то, что он запрашивает unlimited количество значений.

И тут нужно провести важное различие, наш Publisher в переменной count определяет количество элементов который может отдать наш Publisher и это условия определенно нами самими. В принципе мы могли обойтись и без этой переменной и не ограничиваться в передаче чисел Фибоначчи, но довольно скоро мы вышли бы за пределы области допустимых значений типа Int.
Случай с sink другой, каждый Subscriber определяет сколько значений он хочет получить, sink запрашивает неограниченное число значений, это значит что он будет получать значения до тех пор, пока не придет сигнал о завершении, ошибке или отмены.

Для удобства использования нашего Publisher’a добавим его создание в расширение протокола Publishers.

extension Publishers {     private static func fibonacci(configuration: FibonacciConfiguration) -> FibonacciPublisher {         FibonacciPublisher(configuration: configuration)     }      static func fibonacci(count: Subscribers.Demand = .max(6)) -> FibonacciPublisher {         FibonacciPublisher(configuration: FibonacciConfiguration(count: count))     } }

И так опробуем наш Publisher

Publishers.fibonacci(count: .max(10))     .sink { value in         print(value, terminator: " ") } // print 0 1 1 2 3 5 8 13 21 34 - OK

А теперь граничные случаи

Publishers.fibonacci(count: .max(1))     .sink { value in         print(value, terminator: " ") } // prinst 0 - OK  Publishers.fibonacci(count: .max(2))     .sink { value in         print(value, terminator: " ") } // prints 0 1 - OK  Publishers.fibonacci(count: .none)     .print() // логирует все события publisher'a     .sink { value in         print(value, terminator: " ") } // prints receive finished - OK

А что будет, если указать .unlimited?

Publishers.fibonacci(count: .unlimited)     .print()     .sink { value in         print(value, terminator: " ") } // prints 0 1 1 2 3 5 8 13 21 ... и потом креш, вышли за диапазон типа Int.

Как же можно использовать .unlimited, но иметь возможность вывода нескольких чисел? Для этого нам понадобится оператор .prefix(_), который работает аналогичным образом как .prefix(_) из коллекций, а именно оставляет только первые N элементов.

Publishers.fibonacci(count: .unlimited)     .print()     .prefix(5)     .sink { _ in } // prints 0 1 1 2 3 cancel и потом креш, похоже опять вышли за диапазон типа Int.

В чем же проблема? Может в .prefix(_)? Проведем маленький эксперимент на на стандартной последовательность из Foundation.

// бесконечная последовательность 1 2 3 4 5 6 7 8 ... 1...  .publisher .print() .prefix(5)     .sink { _ in } // prints 1 2 3 4 5 cancel - ОК

Как мы можем видеть, вышенаписанный код отработал корректно, значит проблема в нашей реализации Publisher.
Глянем на логи от .print() и увидим, что после N запросов, из .prefix(_) происходит вызов cancel() у нашего FibonacciSubscription, где мы устанавливаем subscriber в nil.

    func cancel() {         subscriber = nil     }

Если открыть стек вызовов, то можно увидеть, что cancel() вызывается из request(_:), а именно во время вызова subscriber?.receive(_). Из чего мы можем сделать вывод, что в некий момент времени внутри request(_:) subscriber может стать nil и тогда нужно прекратить работу генерации новых чисел. Добавим это условие в наш код.

    func request(_ demand: Subscribers.Demand) {         // 1         guard count > .none else {             subscriber?.receive(completion: .finished)             return         }         // 2         count -= .max(1)         subscriber?.receive(0)         guard let _ = subscriber else { return } // new         if count == .none {             subscriber?.receive(completion: .finished)             return         }          // 3         count -= .max(1)         subscriber?.receive(1)         guard let _ = subscriber else { return } // new         if count == .none {             subscriber?.receive(completion: .finished)             return         }          // 4         var prev = 0         var current = 1         var temp: Int         while let subscriber = subscriber { // new             temp = prev             prev = current             current += temp             subscriber.receive(current)             count -= .max(1)             if count == .none {                 subscriber.receive(completion: .finished)                 return             }         }     }

Теперь запустим наш тестовый код.

Publishers.fibonacci(count: .unlimited)     .print()     .prefix(5)     .sink { _ in } // prints 0 1 1 2 3 cancel - ОК

Получили ожидаемое поведение.

Subscriber

И так наш FibonacciSubscription готов? Не совсем, в наших тестах мы только использовали подписчика sink, который запрашивает .unlimited количество чисел, а что если вместо него использовать подписчика, который будет ожидать некоторое ограниченное число чисел. Combine не предоставляет такого подписчика, но что мешает нам написать свой? Внизу реализация нашего FibonacciSubscriber’a.

class FibonacciSubscriber: Subscriber {     typealias Input = Int     typealias Failure = Never      var limit: Subscribers.Demand      init(limit: Subscribers.Demand) {         self.limit = limit     }      func receive(subscription: Subscription) {         subscription.request(limit)     }      func receive(_ input: Input) -> Subscribers.Demand {         .none     }      func receive(completion: Subscribers.Completion<Failure>) {         print("Subscriber's completion: \(completion)")     } }

И так наш FibonacciSubscriber имеет свойство limit, которое определяет сколько элементов хочет получить данный Subscriber. И делается это в методе receive(_: Subscription), где мы сообщаем подписке сколько нам нужно элементов. Так же надо отметить функцию receive(_: Input) -> Subscribers.Demand, эта функция вызывается когда получено новое значение, в качестве возвращаемого значения мы указываем сколько дополнительный элементов мы хотим получить: .none — нисколько, .max(N) N штук, итого общее количество принимаемых элементов будет равно сумме значения посылаемого подписке в receive(_: Subscription) и всем возвращаемым значениям из receive(_: Input) -> Subscribers.Demand.

Второе тестирование

Попробуем использовать FibonacciSubscriber.

let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5))     .print()     .subscribe(subscriber) // prints 0 1 1 2 3 - а должно быть только 0 1 1

Как мы видим наш Publisher отправил 5 значений, вместо 3. Почему так? Потому что в методе request(_: Subscribers.Demand) FibonacciSubscription’a никак не учитывается потребность подписчика, давайте же это исправим, для этого добавим дополнительное свойство requested, через которое будем отслеживать потребность подписчика.

private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {      var subscriber: S?     var configuration: FibonacciConfiguration     var count: Subscribers.Demand     var requested: Subscribers.Demand = .none // new      init(subscriber: S?, configuration: FibonacciConfiguration) {         self.subscriber = subscriber         self.configuration = configuration         self.count = configuration.count     }      func cancel() {         subscriber = nil     }      func request(_ demand: Subscribers.Demand) {         guard count > .none else {             subscriber?.receive(completion: .finished)             return         }         requested += demand // new         count -= .max(1)         requested -= .max(1) // new         requested += subscriber?.receive(0) ?? .none // new         guard let _ = subscriber, requested > .none else { return } // new         if count == .none {             subscriber?.receive(completion: .finished)             return         }          count -= .max(1)         requested -= .max(1) // new         requested += subscriber?.receive(1) ?? .none // new         guard let _ = subscriber, requested > .none else { return } // new         if count == .none {             subscriber?.receive(completion: .finished)             return         }          var prev = 0         var current = 1         var temp: Int         while let subscriber = subscriber, requested > .none { // new             temp = prev             prev = current             current += temp             requested += subscriber.receive(current) // new             count -= .max(1)             requested -= .max(1) // new             if count == .none {                 subscriber.receive(completion: .finished)                 return             }         }     } }

Третье тестирование

let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5))     .print()     .subscribe(subscriber) // prints 0 1 1 - OK

Теперь Publisher отрабатывает корректно.

Финальный код

import Foundation import Combine  struct FibonacciConfiguration {     var count: Subscribers.Demand }  struct FibonacciPublisher: Publisher {      typealias Output = Int     typealias Failure = Never      var configuration: FibonacciConfiguration      func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {         let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration)         subscriber.receive(subscription: subscription)     }  }  private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {      var subscriber: S?     var configuration: FibonacciConfiguration     var count: Subscribers.Demand     var requested: Subscribers.Demand = .none      init(subscriber: S?, configuration: FibonacciConfiguration) {         self.subscriber = subscriber         self.configuration = configuration         self.count = configuration.count     }      func cancel() {         subscriber = nil     }      func request(_ demand: Subscribers.Demand) {         guard count > .none else {             subscriber?.receive(completion: .finished)             return         }         requested += demand         count -= .max(1)         requested -= .max(1)         requested += subscriber?.receive(0) ?? .none         guard let _ = subscriber, requested > .none else { return }         if count == .none {             subscriber?.receive(completion: .finished)             return         }          count -= .max(1)         requested -= .max(1)         requested += subscriber?.receive(1) ?? .none         guard let _ = subscriber, requested > .none else { return }         if count == .none {             subscriber?.receive(completion: .finished)             return         }          var prev = 0         var current = 1         var temp: Int         while let subscriber = subscriber, requested > .none {             temp = prev             prev = current             current += temp             requested += subscriber.receive(current)             count -= .max(1)             requested -= .max(1)             if count == .none {                 subscriber.receive(completion: .finished)                 return             }         }     } }  extension Publishers {     private static func fibonacci(configuration: FibonacciConfiguration) -> FibonacciPublisher {         FibonacciPublisher(configuration: configuration)     }      static func fibonacci(count: Subscribers.Demand = .max(6)) -> FibonacciPublisher {         FibonacciPublisher(configuration: FibonacciConfiguration(count: count))     } }  class FibonacciSubscriber: Subscriber {     typealias Input = Int     typealias Failure = Never      var limit: Subscribers.Demand      init(limit: Subscribers.Demand) {         self.limit = limit     }      func receive(subscription: Subscription) {         subscription.request(limit)     }      func receive(_ input: Input) -> Subscribers.Demand {        .none     }      func receive(completion: Subscribers.Completion<Failure>) {         print("Subscriber's completion: \(completion)")     } }  Publishers.fibonacci(count: .max(4))     .print()     .sink { _ in }  let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5))     .print()     .subscribe(subscriber) 

Результат

Я надеюсь, что эта статья принесла вам больше понимания, что такое Publisher, Subscription и Subscriber, как они взаимодействуют между собой и на какие моменты нужно обратить внимание, когда вы решили реализовать свой Publisher. Любые замечания, уточнения к статье приветствуются.


ссылка на оригинал статьи https://habr.com/ru/post/482690/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *