
Привет, Хабр! На связи системный аналитик Илья Глазунов и разработчик Захар Корсаков. Мы работаем с платформой карточного хранилища Т-Банка.
Когда данных становится много, а архитектура обязана оставаться простой, классические решения начинают упираться в ограничения стека. В нашем случае нужно было организовать асинхронную доставку событий из Cassandra, но без CDC, без отдельного брокера и без разрастания зоопарка технологий. Так мы пришли к Non-transactional Outbox — схеме, которая по духу напоминает Kafka Inside the Database, но живет в рамках наших ограничений и требований к отказоустойчивости.
В статье покажем, что такое Outbox, с чем его едят и как готовят. Расскажем допущения по реализации в нашем случае, вытекающие из отличий от классического паттерна Outbox. Обрисуем ту реализацию, которую сделали своими руками, и покажем бенчмарки нагрузочного тестирования. В конце поделимся списком вопросов, которые возникали чаще всего при обсуждении решения внутри банковского сообщества Apache Cassandra.
Какая стояла задача
В какой-то момент мы столкнулись с задачами, которые в нашем понимании можно было решить через один и тот же паттерн. В перечень задач входили:
-
Асинхронная отправка обновлений по данным и новых данных в корпоративное DWH.
-
Переотправка данных за определенный промежуток времени в тот же DWH в рамках DRP — Disaster Recovery Plan.
-
Направлять синхронно созданные данные из одного сервиса в etl-процесс между другими сервисами нашей системы, не ухудшая Latency и Response Ttime для синхронно предоставляемой услуги создания новой карточки.
В результате мозгового штурма мы решили, что для наших целей можем использовать адаптированный вариант outbox-паттерна, который концептуально закрывал бы все эти задачи. Но требовалось определенное переосмысление.
Что такое Outbox
Паттерн Outbox нашел широкое применение, хотя изначально его использование подразумевалось в основном в событийно-управляемых системах. Центральное звено шаблона — обычно таблица, содержащая заметные изменения, внесенные во внутренние данные хранилища. При этом каждое изменение представляет собой отдельную строку в этой таблице.
Важные моменты устройства паттерна:
-
Каждое событие имеет уникальный идентификатор и строго определенную временную метку. В некоторых кейсах возможно объединение в один атрибут на базе неиндексного токена, если события не получают обновлений. Примером такого токена может быть Snowflake, Timestamp-based UUID и так далее.
-
Обновление внутренних таблиц хранилища и outbox-таблиц должны быть объединены в единую транзакцию. Безтранзакционные реализации могут привести к труднодетектируемым расхождениям.
-
Записи в outbox-таблице живут до момента подтверждения их потребления консьюмерами.
-
Outbox-таблица не подразумевает соотношения схемы данных с внутренней таблицей 1-к-1. Часто она представляет собой вообще некоторое blob-хранилище — для этого и нужен сериализатор (до, как на схеме, или после).
Что дает использование outbox-паттерна:
-
Изоляцию внутренних моделей данных — дополнительный слой абстракции для взаимодействия с потребителями, что снижает степень Coupling.
-
Возможность денормализации данных на выход
-
Асинхронную природу взаимодействия с потребителями, при этом с предоставлением больших гарантий, чем в Kafka и подобных решениях (благодаря отсутствию жестких ограничений по Retention Policy, которые часто встречаются в крупных коммерческих решениях).
Асинхронная природа позволяет сгладить пиковые нагрузки на консьюмеров по сравнению с синхронными способами передачи за счет того, что сами консьюмеры устанавливают скорость потребления данных из outbox-таблиц
Но у всего есть и оборотная сторона медали. Outbox не исключение:
-
Если у сервиса не было БД, теперь она необходима. Отсутствие БД возможно, например, если сервис выступает маршрутизатором с элементами промежуточного преобразования.
-
Транзакционная природа паттерна может влиять на производительность — как процесса предоставления бизнес-ценности в целом, так и локального процесса обработки со стороны хранилища данных.
Все описанное применимо к классическому стеку на основе реляционных систем управления базами данных. Наш кейс имеет несколько иной характер.
Отличия от классики еще не авангардизм
Во время реализации у нас было четыре допущения.
Допущение 1. У нас вышел не то чтобы outbox-паттерн в полном его смысле, а скорее исходящая очередь. От классического Outbox мы взяли идею раздельной обработки потоков данных, асинхронно по отношению друг к другу — входящего и исходящего.
Так как Apache Cassandra не поддерживает в полном понимании механику транзакци, LWT не дают тех гарантий, которых принято ожидать от инструмента реализации транзакций. Реализовать классический вариант паттерна изначально не представлялось возможным.
Хотя А. Беллемар в книге «Создание событийно-управляемых микросервисов. Масштабирование использования организационных данных» упоминал Outbox без транзакционной составляющей. Обычно он транзакционный, но сделали нетранзакционную версию. Такой вариант имеет некоторое количество минусов.
Поэтому, если кто-то не согласен с тем, что мы называем Outbox, представьте, что мы говорим об исходящей очереди на Cassandra, а не об outbox-паттерне.
Допущение 2. Не было необходимости в строгом поддержании ограничений Exactly Once. Мы работаем не с событиями в полном смысле этого слова: не так важно разделять по времени две одинаковые записи. Важнее транслировать последнее актуальное состояние той или иной записи во внешние по отношению к нам системы, да и во внутренние сервисы системы тоже.
Мы придерживаемся семантики At Least Once, а на стороне клиента должна быть реализована идемпотентная обработка (она и реализована).
Допущение 3. Некоторый низкий процент пропуска данных из Outbox допустим. В основном пропуски допускаются из-за возможных сбоев на ДЦ. Сбои могут привести к тому, что при возобновлении ДЦ некорректно соберется кворум из-за отработки антиэнтропийных механик Cassandra. Это актуально для нашего кейса, так как мы располагаем возможностью разворота БД лишь в контурах двух ДЦ (ограничение инфраструктуры). Решается пропуск данных путем периодической сверки источников и переотправки событий в Outbox. При этом потеря данных перед помещением в Outbox предотвращается путем retry-механик.
Допущение 4. Для сокращения пропусков данных из-за перескока каретки требуется некоторый временной лаг между записью и чтением. Мы установили его в размере часа. Эмпирические наблюдения показывают, что диапазон может начинаться с 10 минут.
В нашем процессе актуализация данных в иных сервисах и внешних системах не так важна, а если важна, то есть возможность создать данные в других частях системы с помощью синхронных вызовов.
Посмотрели основные ограничения нашего кейса — теперь рассмотрим реализацию. Мы не настаиваем, что наш вариант — единственный верный и правильный путь решения подобных кейсов. Представленная концепция — опция, которой мы решили воспользоваться из-за факторов вокруг системы и нашего предыдущего опыта.
Что у нас получилось
Рассмотрим, как наполняется и опустошается Outbox, а также как устроена унифицированная схема данных для этого паттерна на базе Cassandra. Рассмотрим основные happy-paths.
Мы поддерживаем оба варианта взаимодействия: синхронный и асинхронный. Главный смысл схем — показать, что подтверждение успешной обработки данных мы отправляем только после подтвержденной успешной записи информации. Подтверждение отправляется в основную и в outbox-таблицы, то есть во все компоненты основного хранилища данных — Cassandra.
Если запись не была произведена полностью, мы отправляем продюсеру данных негативный ответ. Аналогично в случае падения сервиса перед отправкой ответа, но после записи. Тут вступает в дело допущение об идемпотентной обработке запросов на стороне сервиса.
Отдельно отметим, что при наполнении Outbox LWT не используются.
Прежде чем рассматривать процесс разбора данных из Outbox, разберемся с устройством хранения событий. Мы пришли к выводу, что самым эффективным способом реализации шаблона будет воссоздать концепцию Apache Kafka, которая давно и успешно решила проблему построения очередей сообщений.
Мы сделали лог-таблицы и таблицу с мета-данными а-ля как в Kafka. Ее еще можно назвать исходящей очередью. Так мы обеспечили параллельность потребления данных и горизонтальное распределение нагрузок за счет шардированности Cassandra. Мы не добавляли дополнительную точку отказа и не упирались в Retention Policy самой Kafka, но воспроизвели логику ее топиков вплоть до политики репликации, структуры метаданных и прочих прелестей брокера.
Таблица OUTBOX_EVENTS содержит те самые события для потребления. Для деления на разные виды событий мы используем поле topic. Топики разбиваем на партиции по partition_id, который выбирается на основании key. Чтобы избежать hot-partitions, мы добавляем партицирование партицирования в виде bucket.
В нашем случае bucket — переведенная в строку timestamp в ISO-формате «год-месяц-день-час-минута». Бакет при этом можно сделать вычисляемым на основании topic + partition + offset, а можно сделать пулом констант.
Offset — положение конкретного элемента в топике, сделан в виде UUID, часть битов которого формируется на основании метки времени.
Key + value — смысловые поля, которые и формируют ивент. В нашем случае blobs, чтобы можно было положить что угодно. Для верной десериализации используем payload_version, которая ссылается на конкретную схему. Headers может содержать в себе примерно все что угодно для удобства. Мы, например, используем под разметку типов сообщения, чтобы консьюмеры могли пропускать ненужные сообщения.
Таблица CONSUMER_LOCK предназначена для координации потребителей паттерна. Group_id аналогично группе в Kafka — сущность, объединяющая консьюмеров в общность. Topic и partition_id работают так же, как в таблице OUTBOX_EVENTS. Offset — текущий указатель лока группы, locked_till показывает время, до которого лок держится за группой на диапазон значений.
Зная схему данных, погрузиться в сам процесс потребления данных из исходящих таблиц не представляется сложным. Рассмотрим этот процесс.
В процессе потребления событий есть два места, где используются LWT: при захвате лока за консьюмер группой или его обновлении и при коммите офсета. Все начинается с лока по primary key в CONSUMER_LOCK — bucket нужен для расчета offset: оба основаны на временных метках.
После установления лока вычитываются соответствующие события по фильтрации offset, при этом проверяется схема десериализации и хедеры на предмет учета сообщения. Затем десериализованные сообщения отправляются получателям (неважно, синхронно или асинхронно). После подтвержденной отправки ивента коммитится путем передвижения каретки на соответствующую позицию и им присваивается TTL.
Важный момент: между заполнением и чтением outbox должен быть временной лаг для гарантированного перебора всех элементов за выбранный интервал по timestamp. Связано это с процессами репликации внутри Cassandra. В нашем кейсе нет строгих требований и ограничений по скорости разбора исходящей таблицы, так что мы установили задержку в час.
Вот и весь велосипед 🙂
Результаты НТ вместо выводов
Конфигурация: 1 дц, 3 ноды по 4 CPU 16 RAM. Замеряли сохранение единичных (вне батча) карт, в трафике ~90% новых записей, которых нет в БД. А еще замеряли влияние на чтение.
Контрольный замер без реализованного outbox-паттерна.
Замер с реализованным outbox-паттерном.
При проведении нагрузочного тестирования нас интересовало возможное влияние на предоставляемые услуги записи, так как они имеют ограничения на Response Time и пропускную способность, измеряемую в количестве RPS.
При этом разбор входящей очереди не очень интересовал с точки зрения скорости (а именно там используются тяжеловесные LWT), так как ограничения по скорости доставки данных среди потребителей очень мягкие и не лежат на критическом пути.
По графикам можно сделать выводы:
-
Response Time по основным перцентилям значительных изменений не претерпел. Мы вынесли за скобки аномальные всплески 1s на тесте без Outbox, так как пришли к выводу, что это скорее связано с перебоями в сети.
-
В продолжение предыдущего пункта мы перешли на LeveledCompactionStrategy для двух таблиц в исходящей очереди, что позволило вписаться в требуемый Latency.
-
Количество i-операций, которое способна пропускать через себя как нода-координатор, так и ноды-реплики, не изменилось. Важно, что количество i/o операций на графиках — это нагрузка на ноду-координатор с учетом репликации.
По графикам видно, что какой-либо деградации при поднятом паттерне не отмечается. А значит, мы достигли требуемого функционального результата без видимой деградации со стороны БД.
При дальнейшем росте количества запросов на запись мы ожидаем деградации, которая случилась бы и без исходящей очереди. Это может произойти из-за того, что нода-координатор станет узким горлышком при обработке запросов на запись. Как именно координатор работает, описали в предыдущей статье.
Вопросы, с которыми мы сталкивались
Почему бы не использовать Kafka/Rabbit/PostgreSQL?
Ответ делится на несколько частей:
-
Любой новый компонент в цепочке добавляет компонентную связанность и точку отказа, которая, в свою очередь, приводит к тому, что все равно нужен Fallback. Если Kafka уйдет в простой (а она, как и любой другой компонент, может уйти), на нее все равно нужно продумывать дополнительную механику альтернативного потока — и желательно на основном стеке. Такой стек для нас — Cassandra.
-
Специфика нашего банка такова, что на Kafka иногда дополнительные квоты могут долго выдаваться долго. Так как мы делаем много крутых продуктов и платформ, которым требуются мощности, а ЦОДы не способны увеличивать свою мощность так же быстро.
-
Нам желательно иметь легко масштабируемый в обе стороны инструмент, так как массивная нагрузка нужна не на постоянной основе, но расширение может потребоваться в любой момент. А, как мы знаем, Kafka разжимается, но не сжимается (по крайней мере в плане партиций), что приведет к простою значительных ресурсов большую часть времени.
-
Мы не хотели получить разрастание «зоопарка», так что решили попробовать справиться имеющимися средствами.
Почему не использовать CDC?
У нас в Т-Банке высоко развита культура предоставления инфраструктуры для проектов и платформ в режиме as Service — когда выделенная, централизованная команда отвечает за поддержание работоспособности того или иного стека. Это избавляет команды от необходимости поддержки инфраструктуры на стороне этих самых проектов и платформ.
Оборотная сторона условия ведет к тому, что у продуктовых и платформенных команд сужен арсенал влияния на эти самые компоненты и использования их неочевидных возможностей. В частности, для нас закрыт CDC как инструмент и взаимодействовать с ним мы не можем.
Что, если какой-то элемент батча битый? Остановится ли разгребания или запись в Outbox?
Нет, мы просто положим элемент в retry-очередь, которая и так существует. На самом деле даже виртуально 2: для автоматического разбора и для ручного по достижению Retry Counter.
Если у меня не так много данных и нет риска Hot Partitioning, что мне делать с бакетом?
При записи можно его сделать константой. Ну или захардкодить небольшое количество бакетов и сделать распределение по Round Robin. И бакетами могут быть обычные int-/string-значения, не обязательно делать их Timestamp-based.
На какую нагрузку на чтение или коммит рассчитана реализация?
При работе из коробки, я бы сказал, проблемы начнутся при 1 500—2 000 RPS на ноду, которые не будут решатся горизонтальным скейлингом. БД будет упираться в работу с системными таблицами и работу координатора запросов. Так что там уже придется что-то придумывать с Compaction Policy. У нас нагрузка сильно меньше. При этом размер батча — 1 000—10 000.
Какой уровень записи и чтения используете?
Мы используем для обоих процессов QUORUM, но считаем, что при определенных обстоятельствах можно играть и с LOCAL_QUORUM.
Что, если ЦОД или пара ЦОДов «отдыхают» и кворум не собирается?
Мы отключаем разбор outbox-таблиц и переходим в режим write-only по LOCAL_QUOROM. Это позволяет в дальнейшем с помощью Gossip Protocol и антиэнтропийных механизмов успешно реплицировать данные при восстановлении работы ЦОДа.
ссылка на оригинал статьи https://habr.com/ru/articles/1041032/