Велосипед для жрицы Трои, или как мы переизобретали Outbox для нетранзакционной базы данных

от автора

Привет, Хабр! На связи системный аналитик Илья Глазунов и разработчик Захар Корсаков. Мы работаем с платформой карточного хранилища Т-Банка.

Когда данных становится много, а архитектура обязана оставаться простой, классические решения начинают упираться в ограничения стека. В нашем случае нужно было организовать асинхронную доставку событий из Cassandra, но без CDC, без отдельного брокера и без разрастания зоопарка технологий. Так мы пришли к Non-transactional Outbox — схеме, которая по духу напоминает Kafka Inside the Database, но живет в рамках наших ограничений и требований к отказоустойчивости.

В статье покажем, что такое Outbox, с чем его едят и как готовят. Расскажем допущения по реализации в нашем случае, вытекающие из отличий от классического паттерна Outbox. Обрисуем ту реализацию, которую сделали своими руками, и покажем бенчмарки нагрузочного тестирования. В конце поделимся списком вопросов, которые возникали чаще всего при обсуждении решения внутри банковского сообщества Apache Cassandra.

Какая стояла задача

В какой-то момент мы столкнулись с задачами, которые в нашем понимании можно было решить через один и тот же паттерн. В перечень задач входили:

  • Асинхронная отправка обновлений по данным и новых данных в корпоративное DWH.

  • Переотправка данных за определенный промежуток времени в тот же DWH в рамках DRP — Disaster Recovery Plan. 

  • Направлять синхронно созданные данные из одного сервиса в etl-процесс между другими сервисами нашей системы, не ухудшая Latency и Response Ttime для синхронно предоставляемой услуги создания новой карточки.

В результате мозгового штурма мы решили, что для наших целей можем использовать адаптированный вариант outbox-паттерна, который концептуально закрывал бы все эти задачи. Но требовалось определенное переосмысление.

Что такое Outbox

Паттерн Outbox нашел широкое применение, хотя изначально его использование подразумевалось в основном в событийно-управляемых системах. Центральное звено шаблона — обычно таблица, содержащая заметные изменения, внесенные во внутренние данные хранилища. При этом каждое изменение представляет собой отдельную строку в этой таблице. 

Примерная работа outbox-паттерна

Примерная работа outbox-паттерна

Важные моменты устройства паттерна:

  • Каждое событие имеет уникальный идентификатор и строго определенную временную метку. В некоторых кейсах возможно объединение в один атрибут на базе неиндексного токена, если события не получают обновлений. Примером такого токена может быть Snowflake, Timestamp-based UUID и так далее.

  • Обновление внутренних таблиц хранилища и outbox-таблиц должны быть объединены в единую транзакцию. Безтранзакционные реализации могут привести к труднодетектируемым расхождениям.

  • Записи в outbox-таблице живут до момента подтверждения их потребления консьюмерами.

  • Outbox-таблица не подразумевает соотношения схемы данных с внутренней таблицей 1-к-1. Часто она представляет собой вообще некоторое blob-хранилище — для этого и нужен сериализатор (до, как на схеме, или после).

Что дает использование outbox-паттерна:

  1. Изоляцию внутренних моделей данных — дополнительный слой абстракции для взаимодействия с потребителями, что снижает степень Coupling.

  2. Возможность денормализации данных на выход

  3. Асинхронную природу взаимодействия с потребителями, при этом с предоставлением больших гарантий, чем в Kafka и подобных решениях (благодаря отсутствию жестких ограничений по Retention Policy, которые часто встречаются в крупных коммерческих решениях). 

Асинхронная природа позволяет сгладить пиковые нагрузки на консьюмеров по сравнению с синхронными способами передачи за счет того, что сами консьюмеры устанавливают скорость потребления данных из outbox-таблиц

Но у всего есть и оборотная сторона медали. Outbox не исключение:

  1. Если у сервиса не было БД, теперь она необходима. Отсутствие БД возможно, например, если сервис выступает маршрутизатором с элементами промежуточного преобразования.

  2. Транзакционная природа паттерна может влиять на производительность — как процесса предоставления бизнес-ценности в целом, так и локального процесса обработки со стороны хранилища данных.

Все описанное применимо к классическому стеку на основе реляционных систем управления базами данных. Наш кейс имеет несколько иной характер.

Отличия от классики еще не авангардизм

Во время реализации у нас было четыре допущения.

Допущение 1. У нас вышел не то чтобы outbox-паттерн в полном его смысле, а         скорее исходящая очередь. От классического Outbox мы взяли идею раздельной обработки потоков данных, асинхронно по отношению друг к другу — входящего и исходящего. 

Так как Apache Cassandra не поддерживает в полном понимании механику транзакци, LWT не дают тех гарантий, которых принято ожидать от инструмента реализации транзакций. Реализовать классический вариант паттерна изначально не представлялось возможным. 

Хотя А. Беллемар в книге «Создание событийно-управляемых микросервисов. Масштабирование использования организационных данных» упоминал Outbox без транзакционной составляющей. Обычно он транзакционный, но сделали нетранзакционную версию. Такой вариант имеет некоторое количество минусов. 

Поэтому, если кто-то не согласен с тем, что мы называем Outbox, представьте, что мы говорим об исходящей очереди на Cassandra, а не об outbox-паттерне. 

Допущение 2. Не было необходимости в строгом поддержании ограничений Exactly Once. Мы работаем не с событиями в полном смысле этого слова: не так важно разделять по времени две одинаковые записи. Важнее транслировать последнее актуальное состояние той или иной записи во внешние по отношению к нам системы, да и во внутренние сервисы системы тоже. 

Мы придерживаемся семантики At Least Once, а на стороне клиента должна быть реализована идемпотентная обработка (она и реализована).

Допущение 3. Некоторый низкий процент пропуска данных из Outbox допустим. В основном пропуски допускаются из-за возможных сбоев на ДЦ. Сбои могут привести к тому, что при возобновлении ДЦ некорректно соберется кворум из-за отработки антиэнтропийных механик Cassandra. Это актуально для нашего кейса, так как мы располагаем возможностью разворота БД лишь в контурах двух ДЦ (ограничение инфраструктуры). Решается пропуск данных путем периодической сверки источников и переотправки событий в Outbox. При этом потеря данных перед помещением в Outbox предотвращается путем retry-механик.

Допущение 4. Для сокращения пропусков данных из-за перескока каретки требуется некоторый временной лаг между записью и чтением. Мы установили его в размере часа. Эмпирические наблюдения показывают, что диапазон может начинаться с 10 минут. 

В нашем процессе актуализация данных в иных сервисах и внешних системах не так важна, а если важна, то есть возможность создать данные в других частях системы с помощью синхронных вызовов.

Посмотрели основные ограничения нашего кейса — теперь рассмотрим реализацию. Мы не настаиваем, что наш вариант — единственный верный и правильный путь решения подобных кейсов. Представленная концепция — опция, которой мы решили воспользоваться из-за факторов вокруг системы и нашего предыдущего опыта.

Что у нас получилось

Рассмотрим, как наполняется и опустошается Outbox, а также как устроена унифицированная схема данных для этого паттерна на базе Cassandra. Рассмотрим основные happy-paths. 

Sequence diagram процесса наполнения Outbox (асинхронный и синхронный сценарии)

Sequence diagram процесса наполнения Outbox (асинхронный и синхронный сценарии)

Мы поддерживаем оба варианта взаимодействия: синхронный и асинхронный. Главный смысл схем — показать, что подтверждение успешной обработки данных мы отправляем только после подтвержденной успешной записи информации. Подтверждение отправляется в основную и в outbox-таблицы, то есть во все компоненты основного хранилища данных — Cassandra. 

Если запись не была произведена полностью, мы отправляем продюсеру данных негативный ответ. Аналогично в случае падения сервиса перед отправкой ответа, но после записи. Тут вступает в дело допущение об идемпотентной обработке запросов на стороне сервиса.

Отдельно отметим, что при наполнении Outbox LWT не используются.

Схема данных для outbox-паттерна

Схема данных для outbox-паттерна

Прежде чем рассматривать процесс разбора данных из Outbox, разберемся с устройством хранения событий. Мы пришли к выводу, что самым эффективным способом реализации шаблона будет воссоздать концепцию Apache Kafka, которая давно и успешно решила проблему построения очередей сообщений. 

Мы сделали лог-таблицы и таблицу с мета-данными а-ля как в Kafka. Ее еще можно назвать исходящей очередью. Так мы обеспечили параллельность потребления данных и горизонтальное распределение нагрузок за счет шардированности Cassandra. Мы не добавляли дополнительную точку отказа и не упирались в Retention Policy самой Kafka, но воспроизвели логику ее топиков вплоть до политики репликации, структуры метаданных и прочих прелестей брокера.

Таблица OUTBOX_EVENTS содержит те самые события для потребления. Для деления на разные виды событий мы используем поле topic. Топики разбиваем на партиции по partition_id, который выбирается на основании key. Чтобы избежать hot-partitions, мы добавляем партицирование партицирования в виде bucket. 

В нашем случае bucket — переведенная в строку timestamp в ISO-формате «год-месяц-день-час-минута». Бакет при этом можно сделать вычисляемым на основании topic + partition + offset, а можно сделать пулом констант.

Offset — положение конкретного элемента в топике, сделан в виде UUID, часть битов которого формируется на основании метки времени. 

Key + value — смысловые поля, которые и формируют ивент. В нашем случае blobs, чтобы можно было положить что угодно. Для верной десериализации используем payload_version, которая ссылается на конкретную схему. Headers может содержать в себе примерно все что угодно для удобства. Мы, например, используем под разметку типов сообщения, чтобы консьюмеры могли пропускать ненужные сообщения.

Таблица CONSUMER_LOCK предназначена для координации потребителей паттерна. Group_id аналогично группе в Kafka — сущность, объединяющая консьюмеров в общность. Topic и partition_id работают так же, как в таблице OUTBOX_EVENTS. Offset — текущий указатель лока группы, locked_till показывает время, до которого лок держится за группой на диапазон значений.

Зная схему данных, погрузиться в сам процесс потребления данных из исходящих таблиц не представляется сложным. Рассмотрим этот процесс.

Sequence diagram процесса потребления событий из Outbox

Sequence diagram процесса потребления событий из Outbox

В процессе потребления событий есть два места, где используются LWT: при захвате лока за консьюмер группой или его обновлении и при коммите офсета. Все начинается с лока по primary key в CONSUMER_LOCK — bucket нужен для расчета offset: оба основаны на временных метках. 

После установления лока вычитываются соответствующие события по фильтрации offset, при этом проверяется схема десериализации и хедеры на предмет учета сообщения. Затем десериализованные сообщения отправляются получателям (неважно, синхронно или асинхронно). После подтвержденной отправки ивента коммитится путем передвижения каретки на соответствующую позицию и им присваивается TTL.

Важный момент: между заполнением и чтением outbox должен быть временной лаг для гарантированного перебора всех элементов за выбранный интервал по timestamp. Связано это с процессами репликации внутри Cassandra. В нашем кейсе нет строгих требований и ограничений по скорости разбора исходящей таблицы, так что мы установили задержку в час.

Вот и весь велосипед 🙂

Результаты НТ вместо выводов

Конфигурация: 1 дц, 3 ноды по 4 CPU 16 RAM. Замеряли сохранение единичных (вне батча) карт, в трафике ~90% новых записей, которых нет в БД. А еще замеряли влияние на чтение.

Контрольный замер без реализованного outbox-паттерна. 

Графики RPS на ноду, количество ошибок и процент успешных операций

Графики RPS на ноду, количество ошибок и процент успешных операций
График скорости выполнения операций выполнения по 95-му перцентилю (200 — только чтение карты, 201 — чтение + создание карты)

График скорости выполнения операций выполнения по 95-му перцентилю (200 — только чтение карты, 201 — чтение + создание карты)
График скорости выполнения операций выполнения по 99-му перцентилю (200 — только чтение карты, 201 — чтение + создание карты)

График скорости выполнения операций выполнения по 99-му перцентилю (200 — только чтение карты, 201 — чтение + создание карты)
Графики количества операций чтения по 1-му поду клиента и скорости чтения по 99-му (только для БД)

Графики количества операций чтения по 1-му поду клиента и скорости чтения по 99-му (только для БД)
Графики количества операций записи по 1-му поду клиента и скорости чтения по 99-му (только для БД)

Графики количества операций записи по 1-му поду клиента и скорости чтения по 99-му (только для БД)

Замер с реализованным outbox-паттерном.

Графики RPS на ноду, количество ошибок и процент успешных операций

Графики RPS на ноду, количество ошибок и процент успешных операций
График скорости выполнения операций выполнения по 95-му перцентилю (200 — только чтение карты и последующее помещение в Outbox, 201 — чтение + создание карты и последующее помещение в Outbox)

График скорости выполнения операций выполнения по 95-му перцентилю (200 — только чтение карты и последующее помещение в Outbox, 201 — чтение + создание карты и последующее помещение в Outbox)
График скорости выполнения операций выполнения по 99-му перцентилю (200 — только чтение карты и последующее помещение в Outbox, 201 — чтение + создание карты и последующее помещение в Outbox)

График скорости выполнения операций выполнения по 99-му перцентилю (200 — только чтение карты и последующее помещение в Outbox, 201 — чтение + создание карты и последующее помещение в Outbox)
Графики количества операций чтения по 10му поду клиента и скорости чтения по 99-му (только для БД)

Графики количества операций чтения по 10му поду клиента и скорости чтения по 99-му (только для БД)
Графики количества операций записи по 1-му поду клиента и скорости чтения по 99-му (только для БД)

Графики количества операций записи по 1-му поду клиента и скорости чтения по 99-му (только для БД)

При проведении нагрузочного тестирования нас интересовало возможное влияние на предоставляемые услуги записи, так как они имеют ограничения на Response Time и пропускную способность, измеряемую в количестве RPS.

При этом разбор входящей очереди не очень интересовал с точки зрения скорости (а именно там используются тяжеловесные LWT), так как ограничения по скорости доставки данных среди потребителей очень мягкие и не лежат на критическом пути.

По графикам можно сделать выводы:

  1. Response Time по основным перцентилям значительных изменений не претерпел. Мы вынесли за скобки аномальные всплески 1s на тесте без Outbox, так как пришли к выводу, что это скорее связано с перебоями в сети.

  2. В продолжение предыдущего пункта мы перешли на LeveledCompactionStrategy для двух таблиц в исходящей очереди, что позволило вписаться в требуемый Latency.

  3. Количество i-операций, которое способна пропускать через себя как нода-координатор, так и ноды-реплики, не изменилось. Важно, что количество i/o операций на графиках — это нагрузка на ноду-координатор с учетом репликации.

По графикам видно, что какой-либо деградации при поднятом паттерне не отмечается. А значит, мы достигли требуемого функционального результата без видимой деградации со стороны БД.

При дальнейшем росте количества запросов на запись мы ожидаем деградации, которая случилась бы и без исходящей очереди. Это может произойти из-за того, что нода-координатор станет узким горлышком при обработке запросов на запись. Как именно координатор работает, описали в предыдущей статье.

Вопросы, с которыми мы сталкивались

Почему бы не использовать Kafka/Rabbit/PostgreSQL?

Ответ делится на несколько частей: 

  1. Любой новый компонент в цепочке добавляет компонентную связанность и точку отказа, которая, в свою очередь, приводит к тому, что все равно нужен Fallback. Если Kafka уйдет в простой (а она, как и любой другой компонент, может уйти), на нее все равно нужно продумывать дополнительную механику альтернативного потока — и желательно на основном стеке. Такой стек для нас — Cassandra.

  2. Специфика нашего банка такова, что на Kafka иногда дополнительные квоты могут долго выдаваться долго. Так как мы делаем много крутых продуктов и платформ, которым требуются мощности, а ЦОДы не способны увеличивать свою мощность так же быстро. 

  3. Нам желательно иметь легко масштабируемый в обе стороны инструмент, так как массивная нагрузка нужна не на постоянной основе, но расширение может потребоваться в любой момент. А, как мы знаем, Kafka разжимается, но не сжимается (по крайней мере в плане партиций), что приведет к простою значительных ресурсов большую часть времени.

  4. Мы не хотели получить разрастание «зоопарка», так что решили попробовать справиться имеющимися средствами.

Почему не использовать CDC?

У нас в Т-Банке высоко развита культура предоставления инфраструктуры для проектов и платформ в режиме as Service — когда выделенная, централизованная команда отвечает за поддержание работоспособности того или иного стека. Это избавляет команды от необходимости поддержки инфраструктуры на стороне этих самых проектов и платформ. 

Оборотная сторона условия ведет к тому, что у продуктовых и платформенных команд сужен арсенал влияния на эти самые компоненты и использования их неочевидных возможностей. В частности, для нас закрыт CDC как инструмент и взаимодействовать с ним мы не можем.

Что, если какой-то элемент батча битый? Остановится ли разгребания или запись в Outbox?

Нет, мы просто положим элемент в retry-очередь, которая и так существует. На самом деле даже виртуально 2: для автоматического разбора и для ручного по достижению Retry Counter.

Если у меня не так много данных и нет риска Hot Partitioning, что мне делать с бакетом?

При записи можно его сделать константой. Ну или захардкодить небольшое количество бакетов и сделать распределение по Round Robin. И бакетами могут быть обычные int-/string-значения, не обязательно делать их Timestamp-based.

На какую нагрузку на чтение или коммит рассчитана реализация?

При работе из коробки, я бы сказал, проблемы начнутся при 1 500—2 000 RPS на ноду, которые не будут решатся горизонтальным скейлингом. БД будет упираться в работу с системными таблицами и работу координатора запросов. Так что там уже придется что-то придумывать с Compaction Policy. У нас нагрузка сильно меньше. При этом размер батча — 1 000—10 000.

Какой уровень записи и чтения используете?

Мы используем для обоих процессов QUORUM, но считаем, что при определенных обстоятельствах можно играть и с LOCAL_QUORUM.

Что, если ЦОД или пара ЦОДов «отдыхают» и кворум не собирается?

Мы отключаем разбор outbox-таблиц и переходим в режим write-only по LOCAL_QUOROM. Это позволяет в дальнейшем с помощью Gossip Protocol и антиэнтропийных механизмов успешно реплицировать данные при восстановлении работы ЦОДа.

ссылка на оригинал статьи https://habr.com/ru/articles/1041032/