Как мы четыре раза переписали Outbox

от автора

Привет! Я Ильдус Тукаев, разработчик в команде образовательной платформы Т-Банка. Мы помогаем школьникам, студентам, молодым специалистам и сотрудникам получать знания и качать свои софт и хард навыки. Основной язык у нас Go, но сегодня расскажу больше про архитектурную тему: как у нас в команде использовался паттерн Outbox и как он эволюционировал.

За полтора года реализация Outbox прошла четыре итерации. По пути мы ловили VACUUM, который останавливал сервис на три часа, теряли 5% событий за полтора дня и заваливали поддержку обращениями. Расскажу, какие реализации мы пробовали, на какие грабли наступали и почему остановились на варианте, который сами же не считаем идеальным.

Что такое Outbox и зачем он нам понадобился

Начну с простого. Сервис меняет какое-то состояние и должен сообщить об этом другим сервисам через брокер сообщений. Очевидное решение — сначала отправить событие в брокер, потом записать в базу. Но что будет, если запись в базу провалится, например, по таймауту? Событие уже ушло, в базе новое состояние не сохранилось. Получили рассинхрон.

Можно попробовать наоборот: сначала пишем в базу, потом отправляем событие. Тогда возникает другая проблема: что делать, если брокер недоступен? Ретраи помогут до какого-то момента. А если брокер совсем лежит, состояние записано, события нет, другие сервисы об изменении не узнают.

Паттерн Outbox решает проблему рассинхрона между базой и брокером: состояние и событие либо сохраняются вместе, либо не сохраняются вообще. В одной транзакции мы сохраняем и состояние сервиса, и событие в специальную таблицу базы данных. Отдельный воркер потом вытаскивает события из таблицы и отправляет в брокер.

Главный выигрыш — консистентность. Состояние сервиса и событие либо сохраняются вместе одной транзакцией, либо не сохраняются вообще. Не нужно добавлять отменяющие или повторяющие действия — логика отправки становится проще.

Главный минус — задержка отправки. Сообщение проходит теперь в два этапа: сначала — запись в Outbox, потом — отправка в брокер. В брокер оно приходит не мгновенно, между записью и отправкой есть задержка на работу асинхронного воркера.

Наивная реализация

Первая версия была простой и очевидной. Событие сохраняется в таблицу — там есть статус, дата создания и дата обновления. Асинхронный воркер достает пачку событий, отправляет в брокер, потом меняет у них статус и обновляет дату. Отдельная задача чистит записи, у которых дата обновления старше трех дней.

В тот момент через Outbox шло от 300 до 500 тысяч сообщений в сутки, платформой пользовалось около 8 тысяч человек. Все работало.

CREATE TABLE kafka_outbox (    id BIGSERIAL,    event_id UUID NOT NULL,    topic TEXT NOT NULL,    key TEXT NOT NULL,    payload JSONB NOT NULL,    headers JSONB,    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),    status TEXT NOT NULL,    updated_at TIMESTAMPTZ NOT NULL DEFAULT now());CREATE INDEX kafka_outbox_created_at_idx ON kafka_outbox (created_at)WHERE status = 'new';CREATE INDEX kafka_outbox_id_status_idx ON kafka_outbox (id, status)WHERE status = 'new';CREATE INDEX kafka_outbox_updated_at_status_idx ON kafka_outbox (updated_at, status);

Что мы хотели получить от этой реализации:

  • хранить события для дебага и поддержки пользователей, так как в брокере события живут недолго, а доставать их оттуда в нашей инфраструктуре сложно;

  • получить гарантию отправки в брокер: для бизнес-событий мы не можем себе позволить их терять, несмотря на наплывы пользователей или сбои на инфраструктуре

Все три цели мы закрыли. Какое-то время все работало хорошо. А потом случился жесткий VACUUM.

Три часа простоя из-за VACUUM

В какой-то момент VACUUM пришел в таблицу Outbox и полностью остановил работу сервиса. В самом вопиющем случае он отработал около трех часов, пока мы не отменили его вручную. Были инциденты поменьше: VACUUM блокировал сервис по 30—40 минут.

Сервис был еще достаточно новым, было много багов, и не сразу стало понятно, что виноват именно паттерн чистки таблицы. Мы провели много времени на инцидент-колах, пока разобрались.

После расследования причина оказалась такой: какое-то событие застопорилось на отправке и не удалялось. Из-за него не удалялись и остальные записи — они копились. А потом разом приходил воркер и удалял большую пачку. На такую массовую очистку Postgres отвечал тяжелым VACUUM.

Быстрый фикс

В качестве быстрого решения мы отказались от хранения данных и стали удалять события из таблицы сразу после отправки в брокер. В одной транзакции выбрали пачку, отправили, удалили.

Таблицу упростили: убрали поля статуса и даты обновления, лишние индексы. Это сильно ускорило работу. Внедрение стоило практически ничего — доработали за день.

Минус: потеряли историю событий. События теперь не хранятся. Невозможно посмотреть их состояние и нельзя гарантировать, что отправили конкретное событие. Чтобы поискать его постфактум, придется лезть в сам брокер, что в нашей инфраструктуре сложнее. Кроме того, хранятся события не больше суток.

Плюс: стабильность и порядок событий. Долгие приходы VACUUM прекратились, теперь это минута-две, никто не замечает.

Бонус: заодно гарантировали порядок событий. Здесь нас ждал любопытный нюанс в работе Postgres. Раньше мы сортировали события только по дате создания — ID автоинкремента в сортировке не участвовал. Думали, точности timestamp хватит. Оказалось, нет: события могли создаваться с одинаковым timestamp и то, в каком порядке они уйдут в брокер, становилось вопросом удачи.

Для нас это было критично: некоторые сервисы ждут событий в строгом порядке — например, сначала удалить старые данные, потом создать новые. И у нас есть неидемпотентные события, от которых мы пока не избавились. В быстром фиксе мы добавили к сортировке ID, и порядок наконец стал стабильным.

К моменту фикса нагрузка выросла почти вдвое: больше миллиона событий в сутки, 14 тысяч пользователей в день. Реализация справлялась.

Реализация на стероидах

В быстром фиксе мы все еще удаляли каждое отправленное событие — VACUUM приходил недолго, но приходил. И заплатили за это историей: чтобы что-то переотправить, приходилось доставать данные руками или лезть в брокер с его суточным сроком хранения.

Часть решения мы подсмотрели в открытых библиотеках. Для Go хороший пример — это Watermill. Получилось вот что: партиционированная таблица Outbox плюс отдельная таблица Offset, в которой хранится последняя обработанная запись.

Зачем нам две таблицы вместо одной:

  • Запись блокируется только в таблице offsets — она очень легкая, поэтому блокировка не становится узким местом.

  • Таблицы с событиями используются в режиме insert only — только на добавление, без удаления и обновлений.

  • Старые партиции можно отключать, а не удалять записи. Для базы это гораздо более дешевая операция, чем построчное удаление.

В таблице Outbox появилось поле Transaction ID — внутренний ID (xid8) транзакции Postgres. Мы храним его по двум причинам. 

Первая: в связке с offset он гарантирует отправку событий строго в порядке их создания. Внутри одной транзакции порядок задает offset, между транзакциями — Transaction ID.

Вторая: помогает понять, какую запись мы можем обработать последней. Иначе можно проскочить записи, которые сохранятся, когда Postgres закоммитит транзакции уже после нашей выборки.

Что делает воркер за один цикл отправки. Сначала из таблицы Offset берем последний offset и последний Transaction ID. Блокируем запись в Offset, чтобы другой инстанс не забрал те же события. Дальше идем в Outbox и выбираем пачку событий по Transaction ID и offset. Отправляем в брокер. Сохраняем последний offset и Transaction ID обратно в таблицу Offset.

Партиции старше трех дней асинхронно отключает отдельный воркер. Для управления партициями я рекомендую взять готовый менеджер партиций, например pg_partman. Там учтены пограничные случаи, и пользоваться им удобнее, чем писать свой менеджер.

К этому моменту нагрузка снова выросла: около 20 тысяч пользователей в день, почти 2 млн событий в сутки.

Что дала схема с партициями и Transaction ID:

  • События снова хранятся, можно посмотреть и переотправить любое из базы — не надо собирать руками или искать в брокере.

  • Из операций в основной таблице остались только запись и чтение. От удаления и апдейта отказались — это сильно ускорило работу.

  • Порядок событий гарантируется номером транзакции базы данных. Это самый точный порядок, который мы можем получить.

Но не все было гладко. Запрос на выборку из Outbox получился со сложным условием: выбираем по Transaction ID и offset так, чтобы получить все необработанные записи, даже если остановились в середине транзакции с несколькими событиями.

SELECT * FROM (  SELECT "offset", transaction_id, key, topic, payload, headers  FROM kafka_outbox    WHERE    ((transaction_id = $1 AND "offset" > $2) OR (transaction_id > $1))    AND      transaction_id < pg_snapshot_xmin(pg_current_snapshot())) AS messagesORDER BY transaction_id, "offset"LIMIT $3;

Мы ждали, что Postgres пойдет по индексу транзакции и offset. Планировщик сделал по-другому. Он начал выбирать по индексу только верхнюю границу транзакции, а все остальное фильтровал. На пике это выглядело так: Postgres вытаскивал почти всю таблицу и отсекал нижнюю границу фильтром, а это все события за прошедшие дни!

Сервис встал. Мы быстро переключились на предыдущую версию, которая удаляла данные. Все переключения у нас сделаны через флаги — можно поменять переменную окружения, и через минуту все работает иначе.

Самым простым вариантом было упростить условия и разбить на два подзапроса с объединением данных через UNION. Условия каждой части упростились, Postgres пошел по индексу, и все стало быстро.

SELECT * FROM (  SELECT "offset", transaction_id, key, topic, payload, headers  FROM kafka_outbox    WHERE transaction_id = $1 AND "offset" > $2    AND transaction_id < pg_snapshot_xmin(pg_current_snapshot())  UNION ALL  SELECT "offset", transaction_id, key, topic, payload, headers  FROM kafka_outbox    WHERE transaction_id > $1    AND transaction_id < pg_snapshot_xmin(pg_current_snapshot())) AS messagesORDER BY transaction_id, "offset"LIMIT $3;
Limit (cost=2247.15..2247.27 rows=50 width=830)...  ...  -> Append (cost=0.43..2.47 rows=3 width=373)...    -> Index Scan using kafka_outbox_20250424_20250425_pkey...       Index Cond:       ((transaction_id < pg_snapshot_xmin(pg_current_snapshot())) AND        (transaction_id = '391748106'::xid8) AND ("offset" > 1493120))  -> Append (cost=0.43..2091.69 rows=4000 width=830)...    -> Index Scan using kafka_outbox_2025_04_24__2025_04_25_pkey...       Index Cond:       ((transaction_id < pg_snapshot_xmin(pg_current_snapshot())) AND        (transaction_id > '391748106'::xid8))  ...Execution Time: 20.387 ms

Потом мы перешли на более лаконичный вариант с использованием построчного сравнения (Row Constructor Comparison).

SELECT * FROM ( SELECT "offset", transaction_id, event_id, key, topic, payload, headers FROM kafka_outbox WHERE    (transaction_id, "offset") > ($1, $2)    AND transaction_id < pg_snapshot_xmin(pg_current_snapshot())    AND created_at < now() ) AS messages ORDER BY transaction_id, "offset" LIMIT $3

Использовать Transaction ID все равно неприятно: это внутренняя реализация Postgres, гарантий на будущие версии нет. А еще мы завязаны на минимальную исполняющуюся транзакцию. Если кто-нибудь зайдет в базу, наберет begin и забудет про сессию на несколько часов, Outbox перестанет отправлять данные.

Как мы потеряли 5% событий

Мириться с Transaction ID не хотелось, и мы решили попробовать обойтись без него — на дате создания события.

Реализация казалась простой: вместо Transaction ID храним и сравниваем timestamp. Запрос на выборку при этом получился очень коротким и понятным:

SELECT * FROM (  SELECT created_at, "offset", key, topic, payload, headers  FROM kafka_outbox    WHERE created_at = $1 AND "offset" > $2      OR created_at > $1) AS messagesORDER BY created_at, "offset"LIMIT $3;

Дополнительно хотели убрать лишний поход в соседние партиции. Партиционирование у нас по времени создания, и в новый запрос можно ходить только в текущую партицию.

Не сработало, и сейчас понятно почему. Решение было поспешным и шло вразрез нашим же выводам про то, зачем нужен Transaction ID. В моменте идея отказа от него казалась правильной и чистой — мы просто прошли мимо собственного опыта.

Чтобы разобраться, где именно мы сломались, придется вспомнить, как Postgres работает с транзакциями. ID транзакции (xid8) присваивается в момент BEGIN. Функция now() возвращает время первого запроса транзакции — тоже до ее коммита. А записи внутри транзакции становятся видимы другим запросам только после COMMIT (при уровне изоляции Read Committed и выше). 

Опираясь на Transaction ID, воркер отправки знал две вещи: какие транзакции вообще существуют и какие из них еще не закоммичены. Когда мы перешли на timestamp, оба знания пропали. now() не дает никаких гарантий, что между закоммиченными событиями нет дырок от еще незакоммиченных параллельных транзакций.

Где мы ошиблись

Где мы ошиблись

Примерно для 5% событий мы записывали в таблицу offsets timestamp от транзакции, которая стартовала позже параллельных. Когда те параллельные транзакции коммитились, их события уже выпадали из условия выборки. 

Около 5% сообщений из Outbox в брокер не уходили. Мы отработали так примерно полтора дня и только потом заметили проблему. Поддержку завалили обращениями: около тысячи учеников не получили перехода на следующий этап обучения. 

Восстанавливать пришлось вручную: часть событий переотправили полностью, часть — только для пострадавших учеников. Спасли нас старые скрипты, которые мы писали для других инцидентов раньше — их надо было немного поправить под новый код и запустить.

От реализации отказались. У Postgres нет хорошего механизма получить время начала транзакции с нужной нам гарантией. Вернулись к Transaction ID и смирились с его минусами.

Эту итерацию я для себя назвал поспешной. Главный вывод такой: даже если запрос упрощается до приятного, стоит дополнительно проверить, не теряются ли гарантии порядка обработки в конкурентной среде исполнения.

Что можно улучшить

Финал ли это? Нет, всегда есть что улучшить. Если событий через Outbox станет сильно больше, текущий вариант может не справиться — мы выбираем события в один поток.

Реализация на стероидах позволяет масштабироваться: можно завести несколько оффсет-групп и делать выборку по группе. Тогда каждый инстанс приложения возьмет свой поток отправки. Заодно можно сделать выбор лидера, чтобы инстансы не конкурировали впустую.

Балансировщик партиций Kafka и выбор offset-группы должны опираться на одинаковые данные. Иначе при записи в Outbox событию присвоится offset-группа, не соответствующая партиции в Kafka. Сообщения уйдут в разные партиции, а Kafka может гарантировать порядок обработки только в рамках одной партиции.

Сейчас при нашей нагрузке в 2 млн событий в сутки один инстанс приложения занят отправкой примерно 20% времени. По грубой прикидке, мы спокойно переварим до 6 млн событий в сутки. Поэтому многопоточная реализация — задача в горизонте, а не на завтра.

Общие ограничения базы и брокеров никто не отменял. Когда событий станет столько, что их перестанет вытягивать сама база, никакая реализация Outbox не поможет — придется идти в другую сторону.

Какая реализация лучше

Реализация

Плюсы

Минусы

Наивная

Легко разработать, есть история событий

VACUUM может приходить надолго из-за особенностей обновления и удаления записей в постгрес

Быстрый фикс

Самая простая в коде и поддержке, понятная логика

Нет истории событий, отправка в один поток

На стероидах

Самые строгие гарантии порядка, переваривает большой поток событий, история сохраняется

Завязка на внутренний Transaction ID Postgres, риск зависания на минимальной незакрытой транзакции

Многопоточная

Масштабируется по потокам, выдержит большую нагрузку

Сложнее в реализации, требует согласования с балансировщиком Kafka

Сейчас мы используем реализацию на стероидах. Не потому, что считаем ее эталонной: у нее есть слабые места. Просто при текущей нагрузке она дает нужные нам гарантии, а мы научились ее готовить.

Главный вывод после четырех итераций: лучшая реализация та, что останется у вас надолго. Переход с одной реализации на другую — это миграция с новой таблицей, параллельное чтение из обеих, чистка старого кода, переключение, новая миграция и финальная чистка. У меня в первый раз это заняло около месяца. Где-то опасались быстрого перехода, где-то ждали, чтобы не релизиться по пятницам.

Переход от одной миграции к другой

Переход от одной миграции к другой

Куда смотрим дальше

Помимо многопоточной обработки у нас есть еще одна задумка по оптимизации текущей реализации. Хотим отказаться от партиционирования Outbox по времени и перейти на партиции по Transaction ID.

Так получится сильно ускорить вставку и выборку событий, уменьшить нагрузку на обновление индексов и заодно избавиться от вложенного SELECT в основном запросе. Последний пункт особенно заметен в редком, но болезненном сценарии: если воркер отправки какое-то время не работал, выборка большой накопленной пачки сейчас стоит дорого. На партициях по xid8 она пройдет куда быстрее.

Есть пара недостатков, которые мы пока видим в партиционировании по Transaction ID: 

  1. Мы не можем точно определять время хранения записей — только прогнозы и подгонка количества партиций под наши потребности. 

  2. Более сложное управление партициями. Если мы пропустим момент, когда надо создать новые партиции, запись в Outbox может просто остановиться.

Если у вас есть свой опыт реализации Outbox или вы видите подход, который мы упустили, напишите в комментариях. Особенно интересно, как с подобными проблемами справляются те, кто строит Outbox на других СУБД. 

Полезные ссылки

Watermill — Go-библиотека для message-driven-приложений с готовой реализацией Outbox.

pg_partman — расширение Postgres для управления партиционированными таблицами.

pg_cron — расширение Postgres для планирования задач прямо в базе.

ссылка на оригинал статьи https://habr.com/ru/articles/1039560/