Проектируем сервис HTTP-запросов: Kafka, PostgreSQL, Redis-очередь и миллионы логических партиций

от автора

Для тех, кому хочется сразу посмотреть код: репозиторий сервиса — в конце текста.


Откуда задача

Нужен сервис, который централизованно выполняет исходящие HTTP-запросы для экосистемы микросервисов и интеграций. Постановка на уровне требований:

  1. Два режима входа — и синхронный (ответ нужен вызывающей стороне), и асинхронный (достаточно принять задачу и отдать результат «куда-то ещё»).

  2. Два канала постановки — удобно и через HTTP API, и напрямую в Kafka (без лишнего hop через HTTP).

  3. Rate limit — защита квот и предсказуемое поведение при 429 со стороны внешних API.

  4. Кеш ответов — снижение нагрузки на внешние системы и наши же ресурсы.

  5. Строгий порядок там, где он важен — если указан ключ партиции в «ordered»-режиме, сообщения по этому ключу не должны перемешиваться.

  6. Масштаб по числу ключей — сотни тысяч и миллионы логических партиций при ограниченном числе воркеров; одна «тяжёлая» партиция не должна блокировать остальные.

  7. Ретраи и отложенные сообщения — экспоненциальные паузы, очередь «на завтра», планирование на произвольный горизонт.

  8. Единая точка наблюдаемости по действиям — желательно, чтобы «что произошло с задачей» оставалось в журнале событий (логе), а не только в памяти воркера.

Ни одна «одна технология» не закрывает это без слоёв. Сначала — почему в стеке именно Kafka, PostgreSQL и Redis; дальше — как мы спроектировали сервис Requester: контекст, контракты запроса/ответа, движение данных, внутренние воркеры, graceful shutdown, детали rate limit / retry / cache / отложенных задач, wake-up, тестирование и узкое место с большими payload в Redis.


Выбор стека

Асинхронный режим — это в первую очередь очередь: задачи копятся и обрабатываются не в момент вызова API, а позже, конкурируя за пул воркеров. Притом нам нужен строгий порядок там, где бизнес этого требует (по логическому ключу), и при этом журналируемый вход/выход для интеграций. Для такой модели Kafka — естественный выбор: распределённый append-only лог, топики, партиции, consumer groups, долговременное хранение, высокая пропускная способность. Входные и выходные события сервиса мы ведём через Kafka, чтобы единая цепочка «приняли задачу → обработали → отдали ответ» оставалась в брокере.

Отложенные сообщения и ретраи «на часы и дни» нельзя свести только к retention и «переливанию» в рамках одного брокера без надёжного учёта на стороне приложения. Нужна долговечная, транзакционно предсказуемая таблица фактов: когда задача должна снова стать готовой, какая у неё полезная нагрузка, как избежать дублей при гонке воркеров. Для этого оптимальна PostgreSQL: диск, SKIP LOCKED, индексы по scheduled, бэкапы, привычные операции для outbox-шаблона.

Остаётся зазор: ни Kafka, ни PostgreSQL сами по себе удобно не закрывают сценарий «миллионы логических партиций (ключей) + общий пул воркеров + справедливая конкуренция + строгий порядок внутри ключа + откладывание на секунды + ретраи с блокировкой партиции». В consumer group Kafka партиции привязаны к консьюмерам: это не тот же паттерн, что динамическая task-очередь, где воркер в следующий тик берёт следующую готовую задачу из любой логической партиции. Заводить отдельную физическую сущность на каждый бизнес-ключ в Kafka или в RabbitMQ — нереалистично. Outbox в PostgreSQL отлично хранит «запланировано на завтра», но как планировщик следующего тика для сотен тысяч ключей с разным scheduled и приоритетом он не заменяет лёгкий in-memory движок с Lua-атомарностью.

Оптимальный третий слой — Redis и очередь задач на нём: один инстанс (или кластер по мере роста) хранит состояние планировщика — ZSET, блокировки ordered-партиций, идемпотентность, heartbeat воркеров — без миллионов отдельных «очередей» в смысле брокера. Как устроена модель партиция = ordered-порядок / общий пул / reject и отложенные сообщения, rate limit как cooldown партиции, зачем Lua и какие компромиссы по памяти и durability — мы подробно разобрали в отдельной статье: «Очередь на Redis с Lua: порядок в партициях, общий пул воркеров и отложенные сообщения».

Роль smart-redis-queue

В Requester эту роль играет библиотека smart-redis-queue: ordered-партиции с префиксом !, приоритеты, prefetch, Reject/RejectWithDelay, heartbeat и возврат задач при смерти воркера — то есть всё, что в предыдущей статье описано на уровне структур ключей и Lua-скриптов. Requester не дублирует эту механику — он использует очередь как движок планирования: что лежит в payload задачи (полный JSON или ссылка на offset в Kafka) — слой контракта сервиса.


Контекст системы (уровень «ящиков»)

Внешние сервисы могут:

  • вызывать HTTP API Requester;

  • публиковать задачи в топик Kafka in.

Requester выполняет HTTP к целевым URI (или специальный режим без реального HTTP — см. wake up), учитывает лимиты, кеш, порядок и ретраи. Все ответы попадают в топик Kafka out. Дальше потребители забирают результаты сами, либо стоит Redpanda Connect (или аналог), который читает out и по ключам / заголовкам / полям proxyData раскидывает сообщения в нужные топики или очереди целевых сервисов.

Зачем такая «лестница» из компонентов. Kafka даёт append-only журнал: что задача вошла в систему и что из неё вышло — остаётся в топиках. Redis-очередь решает партиционирование и планирование с общим пулом воркеров без заведения миллиона физических очередей. PostgreSQL — долговременный outbox для задач «далеко в будущем» и длинных бэкоффов, чтобы не держать гигантские ZSET и не упираться в модель отложенных сообщений только в Redis.


C4: контейнеры (кратко)

Контейнер

Роль

Requester (процесс)

HTTP API, consumer in, пул воркеров Redis, пул воркеров PostgreSQL→Kafka, consumer out для sync, Kafka producer.

Kafka

Топик in — вход; топик out — выход; единый лог и точка интеграции для Connect.

Redis

Rate limiter, кеш ответов, очередь задач (smart-redis-queue).

PostgreSQL

Таблица outbox: задачи с scheduled в будущем, ретраи с задержкой > 1 с (для не-ordered сценариев).


Контракты: запрос и ответ

Единая форма задачи в HTTP (POST /request) и в Kafka (топик in, value — JSON) совпадает по смыслу: тело = объект с полями request и meta. Ответ, который забирают из топика out (и в режиме sync: true — ещё и в HTTP), — объект response + meta.

request — что выполнить

Поле

Тип / формат

За что отвечает

uri

строка

Целевой URL исходящего HTTP-вызова. Пустая строка — режим wake up: реальный HTTP не делаем, body/headers уходят в ответ «как есть» (см. ниже).

method

строка

HTTP-метод (GET, POST, …).

body

строка

Тело запроса (как правило JSON в виде строки; сервис не парсит схему — это контракт с целевым API).

headers

объект строк

Заголовки к целевому запросу.

meta — политика обработки и идентификаторы

Поле делает сервис гибким, позволяя настраивать стратегии работы сервиса на клиенте который шлет запросы.

Поле

Тип / формат

За что отвечает

requestId

строка, обязателен

Сквозной идентификатор задачи: попадает в out, в ключ Kafka, в meta ответа; по нему маршрутизируют downstream и в sync ждут результат в Hub.

traceparent

строка, опционально

W3C Trace Context: сервис принимает от клиента и продлевает в трейсинг; при пустом значении может заполняться на входе HTTP.

partition

строка, опционально

Логическая партиция очереди в Redis. Если имя начинается с !, включён ordered-режим: строгий порядок внутри ключа, Reject/rate limit ведут себя иначе, чем у обычных партиций.

sync

bool, опционально

true — дождаться результата в HTTP (пока воркер не положит ответ в out, out-consumer доставит в Hub). false или отсутствует — 202 Accepted, результат только в out.

scheduled

время (RFC3339), опционально

Когда первый раз считать задачу готовой к исполнению. Влияет на маршрут Redis (близкое будущее) vs PostgreSQL (дальше ~1 с).

proxyData

произвольный JSON

Прозрачный конверт: Requester не интерпретирует — копирует в meta ответа, чтобы Connect / потребители могли маршрутизировать по своим полям (целевой сервис, тенант, тип события и т.д.).

limiter

объект, опционально

Rate limit до HTTP (и до кеша). См. вложенные поля.

retry

объект, опционально

Политика повторов при 429/5xx/сети для не-ordered; для ordered — используется иначе (reject с задержкой, см. раздел про retry).

cache

объект, опционально

Кеш ответов: ключ и TTL; кладём только успешные 2xx.

meta.limiter:

Поле

За что отвечает

key

Ключ в Redis для счётчиков/окон (общий квотный «ведро»-идентификатор).

algorithm

Имя стратегии: token-bucket, leaky-bucket, fixed-window-counter, sliding-window-log, sliding-window-counter.

rates

Список пар окно + лимит: каждый элемент — duration (строка длительности, по правилам Go time.ParseDuration: 1h, 3s, 500ms, …) и value (целое число разрешённых срабатываний за окно).

meta.retry:

Поле

За что отвечает

max

Максимальное число попыток (с учётом ретраев после первого запуска).

delay / maxDelay

Стартовая и максимальная пауза между попытками (строка длительности).

multiplier

Множитель экспоненциального бэкоффа.

jitter

Доля случайного разброса вокруг задержки (снижает «столбики» нагрузки).

meta.cache:

Поле

За что отвечает

key

Ключ записи в Redis (префикс кеша добавляется на стороне сервиса).

ttl

Время жизни кешированного ответа.

В Kafka доработанная копия задачи (после consumer/processor) также содержит служебные поля верхнего уровня: tryCount (число уже выполненных/запланированных попыток), createdAt (время постановки) — в HTTP при первой отправке клиент их не передаёт, сервис при необходимости проставляет при сериализации Task.

response + meta (топик out и тело sync-ответа)

Поле

Раздел

За что отвечает

response.status

response

HTTP-статус целевого ответа (или синтетический 200 при wake up, 5xx при внутренней ошибке).

response.body / response.headers

response

Тело и заголовки ответа.

meta.requestId

meta

Тот же requestId, что во входе — связка «запрос–ответ».

meta.tryCount

meta

Сколько раз по сути «доходили» до исхода (учёт ретраев).

meta.time

meta

Время обработки на стороне Requester (длительность).

meta.cached

meta

true, если ответ взят из кеша, а не сходил в сеть.

meta.waitTime

meta

Ожидание, связанное с лимитером/паузой (для sync при 429 — в т.ч. для заголовка Retry-After).

meta.proxyData

meta

Эхо из входа: тот же произвольный JSON для downstream.

Сериализация длительностей: в моделях ответа поля meta.time и meta.waitTime имеют тип time.Duration; стандартный encoding/json в Go сериализует их целым числом наносекунд. Если понадобится строка вида 150ms на wire — это уже смена контракта (кастомный MarshalJSON).


Движение данных: от сообщения до ответа

Ключевая идея: в Kafka уже лежит полное тело задачи (JSON). Consumer in не обязан таскать этот JSON в Redis целиком.

Ветвление у consumer in

  • Если задача готова или наступит в пределах ~1 секунды — в Redis уходит лёгкая запись: ссылка на позицию в логе Kafka (topic, partition, offset) — десятки байт JSON.

  • Если отложена дальше — полный payload пишется в PostgreSQL (outbox). Когда наступает время, воркеры PostgreSQL снова отправляют задачу в Kafka in, откуда она обрабатывается обычным путём.

Зачем из PostgreSQL снова в Kafka, а не «сразу в Redis»

  1. Единый путь — вся жизнь задачи снова проходит через in: проще рассуждать, проще трассировка, один формат сообщения.

  2. Журнал — в in остаётся запись о том, что отложенная задача «проснулась»; это не теряется внутри закрытого цикла БД→приложение.

  3. Порядок относительно других событий — новая запись в in упорядочивается так же, как и остальной поток (при одной партиции топика in — глобально FIFO на уровне топика; детальная упорядоченность по бизнес-ключу обеспечивается уже Redis-очередью для ordered-партиций).

Порядок и шаринг воркеров между логическими партициями на этом пути обеспечивает слой Redis; зачем он нужен в связке с Kafka и PostgreSQL — в разделе «Выбор стека» выше.


Логика приложения: из чего состоит процесс

Компоненты

  1. Один consumer группы на топик in
    Достаточно для «перекладывания» сообщений в Redis или PostgreSQL. Топик in в эксплуатации разумно держать с одной партицией, чтобы не плодить параллельные ветки на этом раннем этапе: партиционирование по бизнес-ключу целиком отдаём Redis, чтобы не усложнять систему вторым уровнем партиций в Kafka.

  2. Пул воркеров PostgreSQL (SKIP LOCKED, несколько горутин)
    Забирают готовые строки, отправляют задачу обратно в in.

  3. Пул воркеров Redis (число = MAX_WORKERS)
    Берут задачи из очереди, разрешают payload (чтение из Kafka по offset при необходимости), вызывают processor.

  4. Пул Kafka Fetcher’ов
    По одному соединению на воркер: иначе один mutex на чтение по offset стал бы узким местом при сотнях воркеров.

  5. HTTP-сервер
    POST /request — постановка задачи в in; режим sync: true ждёт результат через локальный Hub, куда попадает ответ из consumer out.

  6. Out consumer
    Отдельная уникальная consumer group на инстанс (hostname + pid), старт с OffsetNewest, чтобы не читать историю при рестарте — нужен только мост в Hub для синхронных клиентов.

Запуск и graceful shutdown

Порядок остановки осмысленный для операторов:

  1. SIGINT / SIGTERM — начинаем shutdown.

  2. Фаза 1: HTTPServer.Shutdown: новые запросы не принимаем, висящие соединения могут дописать ответ.

  3. Фаза 2: воркеры — отмена контекста, WaitGroup дожидается завершения consumer in, PG movers, пула Redis, consumer out.

  4. Фаза 3: закрытие ресурсов — defer на producer’ы, Redis, пул fetcher’ов, БД.

Таймаут shutdown — до порядка минуты, чтобы не обрывать задачи в полёте без шанса на корректное завершение.


Rate limit, retry, cache, отложенные сообщения

Rate limit

Перед кешом и HTTP выполняется проверка лимитера (Redis + стратегии из семейства token/leaky/sliding/fixed window). При отказе:

  • для ordered-партиций (meta.partition начинается с !) — возвращается reject с задержкой (RejectWithDelay): очередь сама блокирует партицию на TTL, не крутим tight loop;

  • для обычных партиций — задача перепланируется на RetryAfter в Redis (если задержка короткая) или в PostgreSQL (если длинная).

Retry

На 429 и 5xx (и на ошибки сети) при наличии meta.retry и неисчерпанном max:

  • ordered — снова через reject + backoff в Redis (порядок и блокировка партиции сохраняются);

  • не ordered — задача сериализуется и уходит в Redis (≤ 1 с до следующей попытки) или в PostgreSQL (> 1 с), processor делает Ack текущей Redis-задачи, потому что retry уже «новая» постановка.

Бэкофф — экспоненциальный с потолком maxDelay и джиттером.

Кеш

При meta.cache ответы 2xx кладутся в Redis (обёртка go-redis/cache). Параллельные промахи по одному ключу схлопываются через singleflight: один реальный HTTP, остальные ждут.

Отложенные сообщения

Поле meta.scheduled: consumer in сам решает — сейчас в Redis (как правило, с offset-ref) или в PostgreSQL для дальнего горизонта.


Wake up (инициация без HTTP)

Если uri пустой, HTTP-вызов не выполняется: тело и заголовки из заявки попадают в ответ как «успешный» 200. В связке с scheduled и proxyData это удобный будильник для других сервисов: в нужный момент в out уходит сообщение, которое downstream может трактовать как событие или команду. Redpanda Connect дальше маршрутизирует по ключу или по полям payload.


Как тестировали

  • WireMock — контролируемый mock HTTP: 500, 429, задержки, детерминированные ответы для сценариев retry и rate limit.

  • k6 — нагрузочные скрипты в репозитории: отдельно limiter, retry, cache, ordered-партиции, max RPS; есть сценарий прямой записи в Kafka in и замера обработки через out (custom k6 с xk6-kafka).

  • Prometheus + Grafana — метрики инфраструктуры и сервиса (в docker-compose заготовки есть).

  • OpenTelemetry — опционально (OTEL_ENABLED=1): полный трейс от http.requestkafka.in.consumekafka.fetchprocessor.handlehttp.clientkafka.outkafka.out.consume, визуализация в Jaeger. Удобно ловить цикл PG → Kafka in → Redis → out.


Узкое место: большие сообщения в Redis

На практике всплыло узкое место: при больших телах задач пропускная способность заметно проседала, потому что крупный JSON гонялся через Redis (память, сеть, сериализация, время Lua/round-trip). Redis в этой архитектуре — координатор очереди, а не хранилище «толстых» документов.

Решение: в Redis кладём только ссылку на offset в Kafka (OffsetRef: topic, partition, offset). Воркер перед обработкой достаёт оригинальное сообщение из Kafka через выделенный fetcher. Пропускная способность и стабильность выросли; память Redis перестала быть линейной от размера body запросов.

Компромисс: пока сообщение не закоммичено и не вычитано, нужна согласованность retention в Kafka с горизонтом отложенных задач в Redis; для «дальних» отложенных задач по-прежнему используется PostgreSQL с полным payload.


Оптимизация через Kafka fetcher: компромиссы

Ссылка на offset вынуждает воркер перечитать запись в Kafka по конкретной позиции. Это не тот же паттерн, что последовательное чтение «хвоста» consumer group-ом: с точки зрения диска возможны скачкообразные (random) обращения к сегментам, выше доля промахов по page cache брокера по сравнению с идеальным стриминговым read.

Для порядка величины ~2500 RPS на нашем контуре это не стало практическим узким местом: у современных SSD (NVMe) произвольный ввод-вывод по-прежнему выдерживает такой профиль лучше, чем механическим дискам прошлого поколения; узким местом ранее оставались именно объём данных через Redis, а не IOPS брокера.

Retention. Если retention в топике in короткий, а воркеры отстают, теоретически сообщение по offset может уже исчезнуть из лога, пока фасад Redis ещё ссылается на него. В нашем продуктовом допущении: отправлять наружу то, что «пролежало» в очереди дольше ~5 часов, не требуется — срок retention и эксплуатационные лимиты можно согласовать с этим горизонтом. Дляё отложенных на длительные сроки задач полный payload по-прежнему живёт в PostgreSQL, а не в цепочке offset-ref.

Куда развиваться, если профиль изменится: хранить тело в отдельном объектном хранилище (S3-совместимое, MinIO и т.д.) с ключом в Redis; либо снова грузить сжатый payload в Redis (snappy/zstd), пожертвовав частью выигрыша по RAM. Как вариант, комбинировать подходы — меньше N kb в redis, все остальное в хранилище. Или даже сделать ttl store в самом приложении и если воркеры успели, то брать из памяти. Нужно экспериментировать и подбирать оптимальный вариант. Это запасной фронт работ.

На текущем этапе offset + fetcher — осмысленный компромисс: Redis остаётся лёгким, RPS отправки (producer) и общая устойчивость сценария для нас достаточны; вместо преждевременной усложнёнки отдельным store мы фиксируем цифры в нагрузочных тестах и при росте нагрузки возвращаемся к вариантам выше.


Итог

  • Спроектирован Requester как сервис исходящих HTTP-запросов с API и Kafka, с единым выходом в топик out для интеграций и Connect.

  • Kafka — журнал и точка входа/выхода; PostgreSQL — outbox для длинных задержек; Redis + smart-redis-queue — партиции, порядок, пул воркеров, rate limit и отложенность «вблизи».

  • Offset-ref и Kafka fetcher сняли давление с Redis; цена — random I/O и зависимость от retention; при ~2500 RPS и согласованных SLA по «свежести» задач это приемлемо; при необходимости — отдельный store или сжатие в Redis.

  • Graceful shutdown, k6, WireMock, Grafana и OpenTelemetry закрывают эксплуатационный цикл: от нагрузочного теста до трассировки полного пути сообщения.

Репозиторий: https://github.com/Rinsvent/requester.


Приложение: ссылки

ссылка на оригинал статьи https://habr.com/ru/articles/1028010/