Потоко-защищённая шина событий в Swift

от автора

В крупных приложениях для iOS взаимодействие между компонентами зачастую оказывается сложнее, чем сам компонент. Сервис завершает операцию, координатор должен отреагировать, возможно, потребуется обновить несколько экранов, и передача каждой зависимости по всему дереву навигации быстро начинает казаться излишней «рутинной» работой. Внедрение зависимостей и управление состоянием по-прежнему имеют своё место. Шина (данных) событий предоставляет нам ещё один инструмент для слабосвязанных уведомлений, где прямое управление добавило бы ненужную сложность. Цель этого компонента проста: позволить одной части приложения публиковать событие, а другим частям приложения — подписываться на события определенного типа. Реализация сосредоточена на типовой безопасности, потокобезопасном хранении, автоматической очистке при деаллокации владельца, явной отмене отдельных подписок, доставке MainActor для кода пользовательского интерфейса и поддержке AsyncStream для потребителей async/await.

Определение события

Каждое событие, передаваемое по шине, соответствует протоколу с использованием небольших маркеров:

public protocol EventBusEvent: Sendable {}

Протокол не требует наличия каких-либо методов или свойств. Его цель — обозначить тип как событие, которое можно публиковать через Шину Событий.

Например:

struct UserDidLoginEvent: EventBusEvent {    let userID: String}

Требование Sendable имеет важное значение, поскольку события могут пересекать границы параллелизма. Опубликованное событие может передаваться из одной задачи в другую, обрабатываться через AsyncStream или планироваться на MainActor. Требование Sendable способствует тому, чтобы полезные нагрузки событий проектировались в виде небольших неизменяемых значений.

Тип EventBus

Корневым элементом является класс:

public final class EventBus: @unchecked Sendable {    private let lock = NSLock()    private var subscriptionsByEvent: [ObjectIdentifier: [SubscriptionEntry]] = [:]    public static let `default` = EventBus()    public init() {}}

Шина хранит подписки в словаре. Ключом служит ObjectIdentifier, созданный на основе типа события. Значением является массив подписок на это событие.

Концептуально хранилище выглядит так:

UserDidLoginEvent.self -> [subscription, subscription]CartDidUpdateEvent.self -> [subscription]

Это позволяет шине разделять подписки по типам событий, при этом по-прежнему используя единое внутреннее хранилище.

Класс помечен как @unchecked Sendable, поскольку безопасность для многопоточного выполнения обеспечивается вручную с помощью NSLock. Компилятор не может полностью проверить безопасность изменяемого словаря, поэтому ответственность за синхронизацию доступа берет на себя реализация.

Компонент также предоставляет общий экземпляр (по-сути, является синглтоном):

public static let `default` = EventBus()

В то же время инициализатор является общедоступным, поэтому шину можно использовать в качестве встраиваемой зависимости в тестах, функциональных модулях или изолированных потоках.

Оповещение о событии

API оповещения лаконичен:

public func publish<Event: EventBusEvent>(_ event: Evesw

Оповещение начинается с создания ключа на основе типа события:

let eventKey = ObjectIdentifier(Event.self)

Затем шина блокирует внутреннюю память, находит подписки для данного типа события, удаляет неактивные или отмененные подписки и создаёт снэпшот получения замыканий:

let receivers: [(Any) -> Void] = lock.withLock {    guard var bucket = subscriptionsByEvent[eventKey] else {        return []    }    bucket = cleanDeadSubscriptions(in: bucket)    subscriptionsByEvent[eventKey] = bucket.isEmpty ? nil : bucket    return bucket.map(\.subscription.receive)}

После снятия блокировки шина передает событие:

receivers.forEach { $0(event) }

Это одно из ключевых проектных решений. Блокировка защищает только внутреннюю структуру данных. Обработчики подписок вызываются вне блокировки, что позволяет избежать взаимодействия пользовательского кода с механизмом синхронизации.

Это также определяет семантику доставки снэпшотов компонента. Вызов оповещения работает со списком подписчиков, существовавшим на момент создания снэпшота. Подписчики, добавленные во время текущего вызова оповещени, получают будущие события. Подписчики, удаленные во время доставки, могут еще присутствовать в снэпшоте, поэтому каждая подписка проверяет состояние своей отмены перед вызовом обработчика.

В случае обычных подписок при выполнении команды publish создается снэпшот, а событие передается синхронно после снятия блокировки.

Подписка на события

Основной API подписки принимает в качестве аргументов владельца и обработчик:

@discardableResultpublic func subscribe<Owner: AnyObject, Event: EventBusEvent>(    owner: Owner,    to eventType: Event.Type = Event.self,    handler: @escaping (Owner, Event) -> Void) -> SubscriptionToken

Типичный пример использования может выглядеть следующим образом:

eventBus.subscribe(owner: self, to: UserDidLoginEvent.self) { owner, event in    owner.handleUserLogin(event)}

Интерес представляет параметр owner. Подписка связана с объектом, и шина хранит этот объект в режиме слабой ссылки. Когда память объекта owner освобождается, подписка становится неактивной и впоследствии очищается в ходе операций оповещений или подписки.

В данной реализации возвращаемый токен не является обязательным для управления жизненным циклом. Жизненный цикл подписки зависит от жизненного цикла владельца. Токен используется в том случае, если необходимо вручную отменить конкретную подписку.

let token = eventBus.subscribe(owner: self) { owner, event in    owner.handle(event)}token.cancel()

Каждый вызов функции subscribe создает отдельную подписку. Один и тот же владелец может подписаться на один и тот же тип события несколько раз, и каждая подписка получает свой собственный токен.

Явная поддержка владельца

Формат обработчика с учетом владельца — это небольшое, но важное решение в отношении API:

{ owner, event in    owner.handle(event)}

Шина слабо захватывает объект владельца и передает в обработчик сильную ссылку только в том случае, если объект-владелец все еще существует.

Это позволяет избежать типичной ситуации, когда для каждой подписки требуется собственный блок [weak self]:

{ [weak self] event in    self?.handle(event)}

API также предоставляет перегрузку, работающую только с событиями:

@discardableResultpublic func subscribe<Owner: AnyObject, Event: EventBusEvent>(    owner: Owner,    to eventType: Event.Type = Event.self,    handler: @escaping (Event) -> Void) -> SubscriptionToken

Эта перегрузка полезна для простых реакций, в которых владелец используется исключительно в качестве ориентира для определения срока жизни. Когда обработчику требуется выполнить обратный вызов в объект-владелец, форма с учетом владения позволяет явно сохранить модель владения в месте вызова.

Представление внутренней подписки

Внутри системы каждая подписка хранится в виде объекта SubscriptionEntry:

struct SubscriptionEntry {    let id: UUID    let subscription: Subscription}

UUID идентифицирует одну конкретную подписку. Это позволяет возвращаемому SubscriptionToken отменить именно ту подписку, на основании которой он был создан.

Сама подписка хранит слабый указатель на владельца, состояние отмены и замыкание с аргументом, у которого очищен тип (type-erased):

final class Subscription {    private weak var owner: AnyObject?    let cancellationState: CancellationState    let receive: (Any) -> Void}

Замыкание принимает тип Any, поскольку шина хранит подписки на множество общих типов событий в одном словаре. Внутри замыкания тип события преобразуется в ожидаемый тип:

self.receive = { [weak owner] rawEvent in    guard !cancellationState.cancelled(),          let owner,          let event = rawEvent as? Event else {        return    }    handler(owner, event)}

Перед запуском обработчика система проверяет, не была ли подписка отменена, находится ли владелец в системе и соответствует ли тип события ожидаемому. Затем она вызывает обработчик.

Отмена (cancellation)

Возвращаемый токен представляет одну конкретную подписку:

public final class SubscriptionToken: @unchecked Sendable {    private let lock = NSLock()    private let cancellationState: CancellationState    private let cancelClosure: @Sendable () -> Void    private var isCancelled = false    public func cancel() {        lock.lock()        guard !isCancelled else {            lock.unlock()            return        }        isCancelled = true        lock.unlock()        cancellationState.cancel()        cancelClosure()    }}

Отмена состоит из двух этапов.

Сначала общий параметр CancellationState помечается как «отмененный». Это предотвращает доставку, даже если подписка уже была скопирована в снэпшот публикации.

Второй: подписка физически удаляется из внутреннего словаря.

Разделение на этапы имеет важное значение: поскольку в системе публикации используется механизм доставки с помощью снэпшотов. Подписка может присутствовать в снэпшоте, а затем быть отменена до вызова соответствующего обработчика. В таком случае, состояние отмены обеспечивает безопасность, не усложняя при этом публичный API.

Отмена подписки по владельцу (Owner)

Шина предоставляет также отмену подписки, основанную на владельце:

public func unsubscribe<Owner: AnyObject, Event: EventBusEvent>(    owner: Owner,    from eventType: Event.Type = Event.self)

Это удаляет все подписки, созданные этим владельцем для одного типа события.

Существует также более общий вариант:

public func unsubscribeAll(for owner: AnyObject)

Это удаляет все подписки, созданные владельцем для всех типов событий.

Эти API полезны в тех случаях, когда компонент имеет четко очерченные границы жизненного цикла и необходимо явно отключить его от группы событий. Для отмены одной конкретной подписки токен является более точным инструментом.

Ленивая очистка

Для этого шине не требуется хук deinit на стороне подписчика. Вместо этого она очищает неактивные подписки по мере необходимости:

func cleanDeadSubscriptions(in bucket: [SubscriptionEntry]) -> [SubscriptionEntry] {    bucket.filter {        $0.subscription.isAlive && !$0.subscription.cancellationState.cancelled()    }}

Эта очистка выполняется во время подписки и публикации.

Подписка считается неактивной, если её владелец был удалён из списка абонентов или если её статус отменен. Шина удаляет эти записи из корзины, прежде чем продолжить работу.

Благодаря этому публичный API остается компактным. Подписчик может полагаться на время жизненного цикла объекта, в то время как шина поддерживает порядок во внутреннем хранилище в ходе обычной работы.

Одноразовая подписка

Некоторые события используются только один раз. Для таких случаев шина предоставляет:

@discardableResultpublic func subscribeOnce<Owner: AnyObject, Event: EventBusEvent>(    owner: Owner,    to eventType: Event.Type = Event.self,    handler: @escaping (Owner, Event) -> Void) -> SubscriptionToken

Однократная подписка использует небольшую блокировку, защищающую доступ. Это является более надежной гарантией, чем просто отмена после первого события, поскольку несколько вызовов оповещения могли уже зафиксировать эту же подписку в своих снэпшотах.

После первой доставки подписка автоматически отменяется:

let token = subscribeInternal(owner: owner, to: eventType) { owner, event in    guard onceGate.consumeFirstDelivery() else {        return    }    tokenBox.cancelAndClear()    handler(owner, event)}

Таким образом, публичный API остается выразительным, а обработка критичных случаев остается внутри компонента.

В реализации используется небольшой вспомогательный объект под названием OnceGate:

final class OnceGate: @unchecked Sendable {    private let lock = NSLock()    private var hasDelivered = false    func consumeFirstDelivery() -> Bool {        lock.lock()        defer { lock.unlock() }        guard !hasDelivered else {            return false        }        hasDelivered = true        return true    }}

Этот шлюз гарантирует, что обработчик запускается только один раз, даже если несколько вызовов оповещения одновременно обращаются к одному и тому же снэпшоту подписки.

Подписки на главном потоке (MainActor)

Коду пользовательского интерфейса (User Interface) часто требуется обработка событий на главном потоке. Для этого шина предоставляет специальный API:

@MainActor@discardableResultpublic func subscribeOnMain<Owner: AnyObject, Event: EventBusEvent>(    owner: Owner,    to eventType: Event.Type = Event.self,    handler: @escaping @MainActor (Owner, Event) -> Void) -> SubscriptionToken

Внутренняя подписка организует доставку следующим образом:

self.receive = { rawEvent in    guard !cancellationState.cancelled(),          let event = rawEvent as? Event else {        return    }    Task { @MainActor in        guard !cancellationState.cancelled(),              let owner = ownerRef.owner as? Owner else {            return        }        handler(owner, event)    }}

Это означает, что событие может быть опубликовано из любого контекста, в то время как обработчик запускается на MainActor.

Стоит отметить, что доставка событий в MainActor происходит асинхронно. При использовании обычного механизма подписки, события доставляются синхронно, тогда как при использовании subscribeOnMain событие сначала передается в главный актор, а уже после этого вызывается обработчик.

Кроме того, предусмотрено две проверки на отмену. Одна из них выполняется до планирования задачи, а другая — во время ее выполнения. Это позволяет учесть случай, когда подписка отменяется после планирования задачи, но до того, как main actor будет вызван обработчиком.

Этот API полезен для вью контроллеров, вью моделей, координаторов и других объектов, связанных с пользовательским интерфейсом.

Поддержка AsyncStream

Шина также может предоставлять события в виде AsyncStream:

public func stream<Event: EventBusEvent>(    _ eventType: Event.Type = Event.self,    bufferingPolicy: AsyncStream<Event>.Continuation.BufferingPolicy = .bufferingNewest(100)) -> AsyncStream<Event>

Это позволяет обрабатывать события с помощью async/await:

for await event in eventBus.stream(UserDidLoginEvent.self) {    print(event)}

Внутри потока создается частный объект-владелец:

final class StreamOwner: @unchecked Sendable {}

Этот владелец сохраняется до тех пор, пока поток активен. Когда поток завершается, подписка аннулируется:

streamContinuation.onTermination = { @Sendable _ in    tokenBox.cancelAndClear()}

При этом сохраняется та же модель жизненного цикла, основанная на владельце, которая используется в стандартном API подписок. По умолчанию используется следующая политика буферизации:

.bufferingNewest(100)

Это обеспечивает потоку буфер с ограниченным размером и позволяет сохранять последние события в случае, если “читатель” работает медленнее, чем “писатель”.

Почему реализация использует NSLock

В данной реализации используется NSLock, поскольку объем общего изменяемого состояния невелик, а критические участки кода короткие. Блокировка защищает только словарь подписок. Обработчики вызываются после создания снэпшота и снятия блокировки.

Такая структура позволяет сохранить синхронность и предсказуемость стандартного процесса публикации:

eventBus.publish(event)

Интеграции MainActor и AsyncStream добавляют асинхронное поведение только в тех случаях, когда API явно этого требует.

Для такого рода компонентов реализация на основе блокировок работает хорошо, поскольку синхронизация ограничивается доступом к хранилищу, в то время как пользовательский код выполняется вне блокировки.

Пример реализации

Вот небольшой пример того, как EventBus можно использовать в приложении.

Сначала определите событие:

struct UserDidLoginEvent: EventBusEvent {    let userID: String}

Затем опубликуйте это из того места, где происходит действие:

final class AuthService {    private let eventBus: EventBus    init(eventBus: EventBus = .default) {        self.eventBus = eventBus    }    func completeLogin(userID: String) {        eventBus.publish(UserDidLoginEvent(userID: userID))    }}

Любой объект, которому необходимо реагировать на событие, может подписаться на него:

final class ProfileViewModel {    private let eventBus: EventBus    init(eventBus: EventBus = .default) {        self.eventBus = eventBus        eventBus.subscribe(owner: self) { owner, event: UserDidLoginEvent in            owner.reloadProfile(for: event.userID)        }    }    private func reloadProfile(for userID: String) {        // Reload user-specific data    }}

Модель представления не должна знать о существовании AuthService, а AuthService не должен знать, какие экраны или координаторы заинтересованы в событии входа в систему. Они взаимодействуют посредством типизированного события, при этом срок действия подписки привязан к экземпляру ProfileViewModel.

Для подписчиков, связанных с пользовательским интерфейсом, ту же идею можно реализовать с помощью subscribeOnMain:

eventBus.subscribeOnMain(owner: self) { owner, event: UserDidLoginEvent in    owner.updateUI(for: event.userID)}

Благодаря этому передача событий остается явной, типобезопасной и согласованной со сроком жизни подписчика.

Заключение

Самая важная часть этой архитектуры — модель владения. Подписка действует в течение всего жизненного цикла владельца. Шина не требует от подписчиков поддерживать токены в активном состоянии только для предотвращения утечек. Токен остается доступным до явной отмены конкретной подписки.

Это небольшое решение определяет всю остальную реализацию: слабые ссылки на владельца, отложенная очистка, состояние отмены, доставка снэпшотов, поддержка MainActor и мостик AsyncStream — все это построено на основе одной и той же модели жизненного цикла.

Результатом является небольшой инфраструктурный компонент, который помогает развязать части приложения для iOS, сохраняя при этом типобезопасность и предсказуемость потока событий.

ссылка на оригинал статьи https://habr.com/ru/articles/1042258/