Реализация итоговой согласованности. Разбор библиотеки event-outbox

от автора

Здравствуйте. Меня зовут Юрий Кехтер, я backend-разработчик на Python.

В этой статье я хотел бы рассказать об архитектурных шаблонах Transactional Outbox и Idempotent Consumer. Кроме того, я хотел бы показать собственную реализацию, содержащую интересное сочетание технологий, выходящее за рамки этих шаблонов, значительно упрощающее реализацию и эксплуатацию:

  • Альтернатива поллингу (polling) при публикации событий.

  • Автоматическое разделение работы по публикации событий.

  • Автоматическое удаление устаревших данных.

  • Возможность запуска в одном процессе с HTTP-сервером.

Я предпочитаю не смешивать разные языки (languages), поэтому постараюсь указывать в скобках название термина на английском, чтобы избежать разночтений.

Библиотека event-outbox написана на Python в порыве энтузиазма, во время «вынужденной паузы» на моей текущей работе. По состоянию на июнь 2024 года, она еще не добралась до версии 1.0.0 и ни разу не использовалась по настоящему. К тому же, ей не хватает полноценной документации.

Код в статье предоставлен исключительно для демонстрации и отличается от исходного кода библиотеки. Я использую синтаксис ... (ellispis), чтобы опустить некоторые несущественные детали, но предоставить достаточно контекста для понимания кода.

Если Вы по какой-то причине решите использовать эту библиотеку, прошу вас связаться со мной, ведь я заинтересован в её развитии. Если Вы чувствуете непреодолимое желание помочь проекту за идею, то приглашаю вас объединить усилия в open-source.

Проблема

Требуется гарантировать итоговую согласованность данных (eventual consistency) при выполнении двух упорядоченных действий на двух разных сервисах в распределенной системе. Между действиями допустима задержка. Общение между сервисами происходит по нестабильной сети. Сервисы могут падать.

Если пренебречь условием «на разных сервисах», то речь идет о монолите (monolith), а решение очевидно. Действия происходят один за другим, изменения данных накапливаются в транзакции. После фиксации транзакции (commit), согласованные изменения записываются в базу данных. Допустимость задержки между действиями игнорируется. Проблема нестабильной сети обходится стороной. Транзакция спасает целостность данных от отказа сервиса в процессе обработки запроса.

Если все же речь идет о двух разных сервисах, то для гарантии целостности (consistency) данных, проблемы нестабильной сети и падения сервисов игнорировать не получится.

Обозначим порядок обработки:

  1. Первый сервис получает запрос (request) от клиента.

  2. Первый сервис выполняет первое действие и публикует событие, инициирующее выполнение второго действия на втором сервисе.

  3. Первый сервис отправляет ответ (response) клиенту.

  4. Второй сервис получает событие и выполняет второе действие.

Коммуникация между клиентом и первым сервисом выходит за рамки решаемой проблемы. Считаем, что запрос (request) все-таки пришел на первый сервис, а ответ (response) все-таки будет доставлен клиенту.

Когда первый сервис выполняет действие и публикует событие, возможны несколько точек отказа:

  • Событие не опубликовано. Если изменения в базе данных зафиксированы (commit) до публикации события, то отсутствует гарантия публикации. Второй сервис или даже система обмена сообщениями могут быть недоступны. Как итог, второе действие может остаться невыполненным.

  • Опубликовано лишнее событие. Если событие опубликовано до фиксирования (commit) изменений в базе данных, то не гарантируется изменение данных. База данных может оказаться недоступной. Как итог, фиксация результатов первого действия может не произойти.

Когда второй сервис получает событие и выполняет второе действие, возможны несколько точек отказа:

  • Событие не доставлено. Второй сервис не получил событие. Если отсутствует гарантия доставки события, то второе действие может остаться невыполненным.

  • Событие обработано неоднократно. Есть система доставки событий с несколькими попытками (retries), требующая от сервиса подтверждения (ack). Если второй сервис выполнил второе действие, но не смог оповестить такую систему об успешной обработке, то может быть предпринята еще одна попытка. Как итог, событие может быть обработано несколько раз.

В любом из этих случаев согласованность данных не гарантируется.

Решение

Примечание:
У проблемы существует несколько решений. Например Two-Phase Commit Protocol. В рамках этой статьи будет рассмотрено альтернативное решение — шаблоны Transactional Outbox и Idempotent Consumer.

В идеальном мире, от механизма доставки и обработки событий требуется:

  • Гарантия публикации события ровно один раз (exactly once).

  • Гарантия доставки события ровно один раз (exactly once).

  • Гарантия обработки события ровно один раз (exactly once).

Мне не известен способ реализации этих гарантий в настоящем мире. Однако, можно использовать другую систему гарантий:

  • Гарантия публикации события хотя бы один раз (at least once)

  • Гарантия доставки события хотя бы один раз (at least once)

  • Гарантия обработки события хотя бы один раз (at least once)

  • Гарантия идемпотентной обработки события (idempotency)

Даже если событие будет опубликовано 100 раз, доставлено 50 раз, а обработано 25 раз, то идемпотентность обработки события (idempotency) позволит гарантировать итоговую согласованность (eventual consistency).

За гарантию итоговой согласованности придется заплатить:

  • В случае отказов сети, сервисов или транзакций, вычислительные ресурсы могут быть потрачены впустую, ведь возможно выполнение работы, вообще не влияющей на состояние системы.

  • Гарантии «хотя бы один раз» (at least once) требуют механизма повторов (retries).

  • Иногда требуется реализовать компенсирующую транзакцию, которая отменит (rollback) первое действие на первом сервисе. Во время допустимой задержки между действиями, второе действие может стать невыполнимым в принципе.

Далее будет рассмотрен механизм доставки, оформленный в виде библиотеки event-outbox.

Технологии

Библиотека имеет ряд конкретных зависимостей и пока не позиционируется как универсальное решение для любого стека. Я попытался взять от используемых технологий максимум, чтобы обеспечить наибольшую полезность при наименьших затратах собственного времени.

MongoDB

MongoDB — это основная СУБД, с которой я работаю последние несколько лет. Некоторые её особенности напрямую повлияли на конечное решение:

  • Transactions — гарантирует атомарную согласованность (transactional consistency) при изменении документов в разных коллекциях.

  • Change Streams — позволяет подписаться на изменения в коллекции и снизить нагрузку на базу данных.

  • Partial TTL indexes — позволяет автоматически удалять опубликованные и обработанные события из базы данных.

Для подключения к базе данных используется асинхронный драйвер motor.

Apache Kafka

Apache Kafka — платформа потоковой передачи событий, с которой я не работал (по состоянию на июнь 2024), но изучал ради интереса. Некоторые её особенности также повлияли на конечное решение:

  • Consumer Groups — позволяет организовать независимую обработку одних и тех же событий в разных группах.

  • Consumer Rebalance Protocol — выдает консюмеру (consumer) эксклюзивные права на обработку событий из партишнов топика (topic partitions) в рамках одной группы консюмеров (consumer group) и автоматическое перераспределение при их подключении / отключении.

  • Manual Offset Management — позволяет гарантировать обработку события хотя бы один раз (at least once) за счет фиксирования смещения (offset commit) непосредственно после обработки события.

  • Custom Partitioner — позволяет использовать собственный алгоритм выбора партишна (partition), в который публикуется событие.

Для публикации и потребления (consume) событий используется асинхронный клиент aiokafka.

Pydantic

Библиотека pydantic используется для описания модели данных (data model) публикуемых событий.

Публикация событий

Transactional Outbox

Transactional Outbox — шаблон, гарантирующий публикацию событий хотя бы один раз (at least once). Суть: отделить намерение (intent) от публикации события.

При выполнении действия, в одной транзакции оказываются:

  • Изменения данных, т.е. результат выполнения действия.

  • Намерения опубликовать события.

После успешной фиксации (commit) такой транзакции, в отдельной коллекции базы данных надежно хранятся намерения опубликовать события. Если произойдет отказ транзакции (abort), то такие намерения просто не будут зафиксированы.

Для публикации событий запускается специальный цикл, читающий документы из коллекции с намерениями и публикующий их в систему обмена сообщениями. После подтверждения публикации, событие помечается опубликованными и больше не публикуется.

В случае отказа системы обмена сообщениями (Kafka), событие останется неопубликованным. Попытки опубликовать событие будут предприниматься до тех пор, пока система обмена сообщениями не станет доступна и не подтвердит публикацию события.

В случае отказа сети, система обмена сообщениями (Kafka) не сможет подтвердить публикацию. При следующей попытке публикации, в системе обмена сообщениями может появиться дубликат события.

При остановке цикла, публикующего события, публикация останавливается до тех пор, пока цикл не будет перезапущен. Так как после публикации, события помечаются в базе данных (MongoDB) опубликованными, цикл может восстановить работу с первого неопубликованного события.

За гарантии доставки и целостность данных придется заплатить увеличением нагрузки на базу данных. Её можно немного уменьшить, задействовав специальные механизмы слежения за изменениями.

Сохранение намерений в базу данных

Начнем с кода:

from contextlib import AbstractAsyncContextManager  from motor.motor_asyncio import AsyncIOMotorClientSession from pydantic import BaseModel   class Event(BaseModel):     ...   class EventListener:     def event_occurred(self, event: Event) -> None:         ...   class EventOutbox:     def event_listener(         self, mongo_session: AsyncIOMotorClientSession     ) -> AbstractAsyncContextManager[EventListener]:         ...

Интерфейс EventListener — это синхронный слушатель событий. Он объявляет единственный метод event_occurred, который принимает event — произошедшее событие.

Класс EventOutbox используется как менеджер контекста (context manager) таких слушателей. С помощью метода event_listener можно асинхронно открыть контекст, передав сессию (session) базы данных:

from motor.motor_asyncio import AsyncIOMotorClient  from event_outbox import Event, EventOutbox  async def handler(     mongo_client: AsyncIOMotorClient,     outbox: EventOutbox, ) -> None:     db = mongo_client.get_default_database()     async with await mongo_client.start_session() as session:         async with outbox.event_listener(session) as listener:             await db["collection"].insert_one({}, session=session)             listener.event_occurred(                 Event(                     topic="bounded_context",                     content_schema="EventOccurred",                 )             ) 

Сессия базы данных нужна для того, чтобы намерения опубликовать события попали в одну транзакцию с результатом выполнения действия. Таким образом, контекстный менеджер (context manager) EventOutbox.event_listener сам открывает и фиксирует (commit) транзакцию.

Класс Event — это модель данных pydantic. Она используется для представления (serialize/dump) события в json и передачи по сети. Новый класс событий предполагается объявлять наследником класса Event и описывать в нем все данные события:

from typing import Literal  from event_outbox import Event  class EventOccurred(Event):     topic: Literal["bounded_context"] = "bounded_context"     content_schema: Literal["EventOccurred"] = "EventOccurred"     extra_data: int 

Изменения вставляются в коллекцию пачкой (insert many) при выходе из контекста:

from contextlib import asynccontextmanager from typing import Any, AsyncIterator, Mapping  from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection  outbox: AsyncIOMotorCollection = ...   class EventListener:     ...   class EventOutbox:     mongo_outbox: AsyncIOMotorCollection      @asynccontextmanager     async def event_listener(         self, mongo_session: AsyncIOMotorClientSession     ) -> AsyncIterator[EventListener]:         documents: list[Mapping[str, Any]] = ...         listener: EventListener = ...         async with self.mongo_session.start_transaction():           yield listener           await self.mongo_outbox.insert_many(               documents,               session=mongo_session,           ) 

Чтение намерений из базы данных

При запуске или перезапуске цикла публикации, все накопившиеся события будут последовательно прочитаны через обычный find и опубликованы.

Когда документов в коллекции не остается, необходимо каким-то образом подождать появления новых. Классическое решение — организовать поллинг (polling) коллекции. Change Streams позволяют реализовать альтернативу поллингу (polling) и снизить нагрузку на базу данных. Можно подписаться на операции insert в коллекции и ждать, когда MongoDB сама оповестит о новом документе.

from typing import Any, AsyncIterator, Mapping  from bson import Timestamp from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection   class EventPublisher:     mongo_session: AsyncIOMotorClientSession     mongo_outbox: AsyncIOMotorCollection      async def subscribe_to_change_stream(         self, start_at_operation_time: Timestamp     ) -> AsyncIterator[Mapping[str, Any]]:         async with self.mongo_outbox.watch(             [{"$match": {"operationType": {"$in": ["insert"]}}}],             start_at_operation_time=start_at_operation_time,             session=self.mongo_session,         ) as change_stream:             async for change_event in change_stream:                 yield change_event["fullDocument"] 

Эксклюзивные права на публикацию

Consumer Rebalance Protocol позволяет Kafka автоматически назначать (assign) консюмерам (consumer) партишны топиков (topic partition) таким образом, чтобы из партишна одновременно читал только один консюмер группы.

Что именно было назначено консюмеру (consumer), можно узнать во время выполнения:

from aiokafka import AIOKafkaConsumer  kafka_consumer: AIOKafkaConsumer = ... assignment = kafka_consumer.assignment()

Если консюмер (consumer) и продюсер (producer) запускаются в одном процессе, то способность Kafka назначать партишны консюмерам (consumer) может быть использована для иной цели — предоставления продюсерам (producer) эксклюзивного права на публикацию в партишны (partition):

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from pydantic import BaseModel   class Event(BaseModel):     topic: str     partition_key: int   class EventPublisher:     kafka_consumer: AIOKafkaConsumer     kafka_producer: AIOKafkaProducer      async def publish_event(self, event: Event) -> None:         partition = event.partition_key % len(             self.kafka_consumer.partitions_for_topic(event.topic)         )         assignment = self.kafka_consumer.assignment()         if any(             topic_partition.partition == partition             for topic_partition in assignment             if topic_partition.topic == event.topic         ):             await self.kafka_producer.send_and_wait(                 event.topic,                 event.model_dump_json().encode(),                 partition=partition,             ) 

Таким образом, существует возможность запустить несколько параллельных процессов публикации событий и использовать Kafka для разделения работы между ними. Каждый из продюсеров (producuer) публикует события только в выделенные ему партишны топика (topic partitions).

Доставка событий

При попадании события в кластер, доставка хотя бы один раз (at least once) гарантируется самой Kafka.

Обработка событий

Idempotent Consumer

Idempotent Consumer — шаблон, гарантирующий идемпотентность (idempotency) обработки событий. Суть: зафиксировать (commit) факт обработки вместе с результатом обработки.

Когда событие поступает из сети, оно сохраняется в базу данных. Выполняется обработка. В одну транзакцию попадают:

  • Изменение флага у входящего события.

  • Изменения данных, т.е. результат обработки.

  • Намерения опубликовать другие события.

После фиксации (commit) такой транзакции, событие считается обработанным и больше не обрабатывается.

Если по какой-то причине произошла одновременная обработка одного и того же события, то будет зафиксирована только одна транзакция, а вторая будет отменена (abort).

Если необходимо выполнить запрос во внешнюю систему, то рекомендуется передать ключ идемпотентности. Тогда повторная обработка событий гарантировано не приведет к повторному выполнению действия во внешней системе.

Обработчик событий

Для начала рассмотрим интерфейс обработчика событий — протокол EventHandler:

from typing import Protocol  from motor.motor_asyncio import AsyncIOMotorClientSession   class Event:     topic: str     content_schema: str   class EventOutbox:     ...   class EventHandler(Protocol):     async def __call__(         self,         event: Event,         mongo_session: AsyncIOMotorClientSession,         /,     ) -> None:         pass 

С ним совместима функция, принимающая 2 аргумента — обрабатываемое событие и сессию базы данных с открытой транзакцией, в которой необходимо выполнять запросы.

Событие приходит в обработчик как экземпляр класса Event. Библиотека не реализует маршрутизацию (routing) по типам событий. Специального механизма внедрения зависимостей в обработчик тоже нет. Для внедрения экземпляра EventOutbox или любых других зависимостей, можно написать небольшой lambda-адаптер:

from motor.motor_asyncio import AsyncIOMotorClientSession  from event_outbox import Event, EventHandler, EventOutbox   async def event_handler(     event: Event,     session: AsyncIOMotorClientSession,     outbox: EventOutbox,     answer: int, ) -> None:     match (event.topic, event.content_schema):         case ("bounded_context", "EventOccurred"):             ...   def create_adapter() -> EventHandler:     outbox: EventOutbox = ...     return lambda event, session: (         event_hander(             event,             session,             outbox,             answer=42,         )     ) 

Эксклюзивные права на обработку

Consumer Rebalance Protocol выдает консюмеру (consumer) в группе эксклюзивные права на обработку событий из назначенных ему партишнов топиков (topic partitions).

Сохранение входящего события в базу данных

Пришедшее из Kafka событие сохраняется в специальную коллекцию входящих событий. MongoDB позволяет использовать словарь в качестве идентификатора, чтобы воспользоваться встроенным (default) уникальным индексом на _id для обеспечения уникальности по нескольким полям.

from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection from pydantic import BaseModel   class Event(BaseModel):     topic: str     content_schema: str     idempotency_key: str   class EventConsumer:     mongo_inbox: AsyncIOMotorCollection     mongo_session: AsyncIOMotorClientSession      async def handle_events(self) -> None:         while True:             event: Event = ...             document_id = event.model_dump(                 mode="json",                 include={"topic", "content_schema", "idempotency_key"},             )             await self.mongo_inbox.insert_one(                 {"_id": document_id, "handled": False},                 session=self.mongo_session,             )             ... 

Оптимистическая блокировка

Перед обработкой события открывается транзакция, в которой необработанное событие помечается обработанным. Фактически, это оптимистическая блокировка по полю handled. При наличии нескольких конкурирующих транзакций, изменяющих флаг handled, зафиксирована (commit) будет только одна.

from datetime import UTC, datetime from typing import Any, Mapping, Protocol  from motor.motor_asyncio import AsyncIOMotorClientSession, AsyncIOMotorCollection   class Event:     ...   class EventHandler(Protocol):     async def __call__(         self,         event: Event,         mongo_session: AsyncIOMotorClientSession,         /,     ) -> None:         pass   class EventConsumer:     mongo_session: AsyncIOMotorClientSession     mongo_inbox: AsyncIOMotorCollection     event_handler: EventHandler      async def handle_event(self, document_id: Mapping[str, Any], event: Event) -> None:         async with self.mongo_session.start_transaction():             result = await self.mongo_inbox.update_one(                 {"_id": document_id, "handled": False},  # <-- Here                 {                     "$set": {                         "handled": True,                         "handled_at": datetime.now(tz=UTC),                     }                 },                 session=self.mongo_session,             )             if result.modified_count:                 await self.event_handler(event, self.mongo_session) 

Идемпотентный запрос к внешней системе

Событие Event содержит ключ идемпотентности (idempotency key). Ключ генерируется при создании экземпляра Event как hex-представление UUID4. Этот ключ может использоваться для идемпотентных запросов к внешним системам:

from motor.motor_asyncio import AsyncIOMotorClientSession  from event_outbox import Event   async def send_email(idempotency_key: str) -> None:     ...   async def event_handler(     event: Event,     session: AsyncIOMotorClientSession, ) -> None:     await send_email(event.idempotency_key) 

Подтверждение обработки

Manual Offset Management позволяет вручную управлять смещением (commit offset), чтобы зафиксировать в Kafka факт обработки события непосредственно после обработки. Для этого консюмер создается с флагом enable_auto_commit=False:

from aiokafka import AIOKafkaConsumer   class EventConsumer:     kafka_consumer: AIOKafkaConsumer      async def handle_events(self) -> None:         while True:             kafka_consumer_record = await self.kafka_consumer.getone()             ...             await self.kafka_consumer.commit()  # <-- Here   async def initialize() -> None:     """     Your service initialization code     """     topics: list[str] = ...     bootstrap_servers: str = ...     group_id: str = ...     async with AIOKafkaConsumer(         *topics,         bootstrap_servers=bootstrap_servers,         group_id=group_id,         auto_offset_reset="earliest",         enable_auto_commit=False,  # <-- Here     ) as kafka_consumer:         ... 

В случае перезапуска цикла обработки событий, цикл продолжит работу с первого необработанного события.

Удаление устаревших событий

Чтобы избежать повторной обработки, необходимо достаточно долго держать в базе данных информацию об обработанных событиях. В связи с этим, в базе данных накапливается большое количество устаревших документов. Для удаления устаревших данных используется Partial TTL indexes. Событие автоматически удаляется из базы данных после публикации или обработки, через заранее определенное время. Это позволяет переложить задачу очистки базы данных на MongoDB:

from datetime import timedelta  from motor.motor_asyncio import AsyncIOMotorCollection   class EventOutbox:     mongo_outbox: AsyncIOMotorCollection     mongo_inbox: AsyncIOMotorCollection      async def create_indexes(self) -> None:         await self.mongo_outbox.create_index(             "published_at",             name="expiration",             partialFilterExpression={"published": True},             expireAfterSeconds=timedelta(days=1).total_seconds(),         )         await self.mongo_inbox.create_index(             "handled_at",             name="expiration",             partialFilterExpression={"handled": True},             expireAfterSeconds=timedelta(days=1).total_seconds(),         ) 

Инициализация и запуск

Асинхронные циклы публикации в Kafka и обработки событий из Kafka запускаются в одном процессе. Например, их можно запустить вместе с HTTP-фреймворком. Тогда приложение будет состоять из трех основных циклов:

  • Цикл обработки HTTP-запросов.

  • Цикл публикации событий.

  • Цикл идемпотентной обработки событий.

Например, при использовании HTTP-фреймворка FastAPI, инициализацию можно выполнить в lifespan:

from contextlib import asynccontextmanager from typing import AsyncIterator  from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from fastapi import FastAPI from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorClientSession  from event_outbox import Event, EventOutbox   async def event_handler(     event: Event,     session: AsyncIOMotorClientSession,     outbox: EventOutbox, ) -> None:     ...   @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]:     mongo_client: AsyncIOMotorClient = ...     kafka_producer: AIOKafkaProducer = ...     kafka_consumer: AIOKafkaConsumer = ...      event_outbox = EventOutbox(         mongo_client,         kafka_producer,         kafka_consumer,     )     await event_outbox.create_indexes()     async with event_outbox.run_event_handler(         lambda event, session: event_handler(             event,             session,             event_outbox,         )     ):         yield  def create_app() -> FastAPI:     return FastAPI(lifespan=lifespan) 

Заключение

Реализация итоговой согласованности (eventual consistency) за счет гарантий доставки и идемпотентной обработки — это мощный механизм, который часто ускользает из виду. Возможность запускать циклы обработки и публикации в одном процессе с HTTP-сервером позволяет интегрировать решение в проект без запуска дополнительных процессов (worker). Использование механизмов отслеживания изменений в базе для ожидания новых событий является альтернативой поллингу (polling) и снижает нагрузку на базу данных. Эксплуатирование механизма назначения партишнов (partition) между консюмерами (consumer) позволяет также разделить работу по публикации событий между продюсерами (producer). Автоматическое удаление устаревших событий из базы данных позволяет снизить затраты на хранение.

В целом, я доволен реализованным решением. Мне очень хотелось бы услышать мнение сообщества. Я первый раз в open-source. Буду рад, если проект окажется полезным.

Если после прочтения у вас остались вопросы, предлагаю ознакомиться с исходным кодом или обсудить в комментариях.

Спасибо за внимание!


ссылка на оригинал статьи https://habr.com/ru/articles/820867/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *