Как использовать topic exchange в RabbitMQ для роутинга по шаблонам

от автора

Привет, Хабр!

Сегодня разберём один из самых гибких инструментов в RabbitMQ — topic exchange. Именно он позволяет не просто отправить сообщение «куда-то», а превратить очередь в маршрутизатор уровня BGP, но только внутри твоей системы.

Что такое topic exchange и в каких архитектурах он нужен

Прежде чем начнём, зафиксируем один фундаментальный принцип: topic exchange — это шаблонный маршрутизатор. Он позволяет описать, кому доставить сообщение, используя гибкие шаблоны ключей, основанные на структуре из точек (.).

Пример ключа:

invoice.created.eu.highvalue

Т.е есть по фатку вместо того чтобы писать условие if region == "eu" and amount > 10000, ты просто биндишь очередь на шаблон:

invoice.created.eu.*

И все, логика переезжает из кода в инфраструктуру.

topic exchange полезен, когда вы строите микросервисную архитектуру с логической маршрутизацией, работаете в мультитенантной среде с разграничением по tenant ID в ключе, настраиваете CI/CD-пайплайны с шаблонными задачами вроде ci.build.java или ci.test.python, реализуете расширяемую систему алертов (alert.#) или ведёте распределённое логгирование (*.error.*). Главный плюс — масштабируемость: добавляйте новые сервисы, меняйте схему маршрутизации, не трогая код отправителей — всё управляется через биндинги.

Синтаксис и семантика

topic exchange работает с двумя спецсимволами:

  • * — один сегмент (одно слово)

  • # — ноль или больше сегментов

Важно помнить:

routing key

matched by

a.b.c

..*, a.*.c, a.#

order.created.us.ny

order.*.us.*, #.ny

ci.test.go

ci.*.go, ci.#

Некоторые нюансы

  1. # не может стоять внутри: a.#.bнедопустимо

  2. заменяет только один сегмент, не два: a.b

  3. routing key должен быть не пустым и без пробелов

Паттерн проектирования ключей

Один из лучших способов описания структуры routing key — это hierarchical domain model, где каждый сегмент описывает контекст:

<domain>.<event>.<scope>.<detail>

Примеры:

  • auth.login.success.user123

  • payment.failed.eu.customer456

  • order.created.us.ny

Такую структуру можно потом биндингами резать как хочешь:

  • auth.*.success.* — ловим успех в авторизации

  • *.failed.# — ловим любые фейлы

  • order.created.# — всё про создание заказов

Пример применения

Допустим, есть платформа, разрезанная на микросервисы. Каждый сервис отвечает за свой бизнес-контекст:

  • auth — аутентификация и авторизация

  • orders — оформление и управление заказами

  • payment — платежи и возвраты

  • notifications — обработка писем и push-уведомлений

Вся система должна быть:

  • отказоустойчивой

  • наблюдаемой

  • расширяемой без редеплоя продюсеров

Проектируем событие в виде routing key, по следующей схеме:

<domain>.<event>.<level>.<region>.<tenant>

Примеры ключей:

auth.login.failed.error.eu.tenant42 payment.refund.processed.info.us.tenant17 orders.create.success.info.ru.tenant01 notifications.email.bounced.warn.eu.tenant42

Требования к системе доставки:

  1. Все события .error..*.* -> очередь critical-ops (обработка ошибок, инциденты)

  2. Все события из payment -> очередь finance-pipeline

  3. Все bounced уведомления ->очередь email-troubles

  4. Все события по tenant42 -> очередь audit-tenant42

  5. Все успешные события .success.info..* -> очередь success-feed для ML-аналитики

Конфигурация exchange и биндингов

channel.exchange_declare(     exchange="events_topic",     exchange_type="topic",     durable=True )

Биндинги:

Очередь

Binding key

critical-ops

..error.*.*

finance-pipeline

payment.#

email-troubles

notifications.email.bounced.#

audit-tenant42

#.*.*.*.tenant42

success-feed

..success.info.*.*

Каждый из этих биндингов позволяет получать события по семантике, не переписывая отправителей. Добавили новый домен — биндим новую очередь. Расширили теги — не трогаем старый код.

Отправка событий

Реализация отправителя producer.py:

import pika  def send_event(routing_key: str, body: str):     connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))     channel = connection.channel()      channel.exchange_declare(exchange="events_topic", exchange_type="topic", durable=True)      channel.basic_publish(         exchange="events_topic",         routing_key=routing_key,         body=body.encode("utf-8"),         properties=pika.BasicProperties(             delivery_mode=2  # persistent         )     )      print(f"Sent [{routing_key}]: {body}")     connection.close()  # Пример отправки события send_event("auth.login.failed.error.eu.tenant42", "Login failed from IP 1.2.3.4")

Обработка сообщений с ack, QoS и ручной контролем

Реализация обработчика ошибок (consumer_critical.py):

import pika  def handle_critical(ch, method, properties, body):     print(f"[CRITICAL] {method.routing_key}: {body.decode()}")     # логика обработки, отправка в Slack, логгинг, мониторинг     ch.basic_ack(delivery_tag=method.delivery_tag)  connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel()  channel.queue_declare(queue="critical-ops", durable=True)  channel.queue_bind(     exchange="events_topic",     queue="critical-ops",     routing_key="*.*.error.*.*" )  channel.basic_qos(prefetch_count=5) channel.basic_consume(queue="critical-ops", on_message_callback=handle_critical)  print("Waiting for critical events...") channel.start_consuming()

prefetch_count=5 — позволяет контролировать, чтобы воркер не захлебнулся, если сообщение тяжёлое. basic_ack — обязательно нужен, чтобы сообщение не считалось обработанным, пока оно реально не пройдено. delivery_mode=2 — сообщение переживёт рестарт брокера. Очередь durable=True — не потеряет биндинги и сообщения при сбоях.

Поведение системы при множестве биндингов

Если событие с ключом payment.refund.failed.error.us.tenant42 отправлено:

  • попадёт в critical-ops (совпадает с ..error.*.*)

  • попадёт в finance-pipeline (совпадает с payment.#)

  • попадёт в audit-tenant42 (совпадает с #.*.*.*.tenant42)

Одно и то же сообщение будет скопировано RabbitMQ в несколько очередей по биндингу. Т.е продюсер делает один publish, а exchange делает мультикаст по правилам.

Как тестировать и отлаживать

Чтобы убедиться, что всё работает:

channel.queue_declare(queue="", exclusive=True) channel.queue_bind(     exchange="events_topic",     queue=result.method.queue,     routing_key="#" )

Временно создаём анонимную очередь с шаблоном #, чтобы получить все события из exchange, и логируем каждый routing key вместе с телом сообщения и совпавшими биндингами.

По итогу построили систему, где:

  • routing key — это контракт маршрутизации

  • topic exchange — центр управления логикой

  • очереди — реализуют семантическую подписку без кода

  • продюсеры не зависят от инфраструктуры подписчиков

Подход сам по себе масштабируем и гибок.


Заключение

topic exchange — проверенный инструмент для построения маршрутизации событий в распределённых системах.

Если вы уже применяете topic exchange в своих проектах — расскажите об этом в комментариях: какие схемы вы проектировали, где ошибались, какие шаблоны оказались самыми удобными? И, конечно, в RabbitMQ есть и другие мощные механизмы: headers exchange для матчей по метаданным, consistent-hash exchange для балансировки, dead letter exchange для надёжной обработки ошибок и alternate exchange как fallback-механизм. Всё это может быть объединено в продуманную архитектуру, если понять, где что работает лучше.


Если вы работаете с микросервисами, интеграциями или хотите глубже понять, как управлять потоками сообщений — приглашаем вас на два открытых урока:

8 июля в 20:00 — «RabbitMQ: как заставить сообщения летать по сложным маршрутам». Разберёмся, как использовать topic exchange для гибкой маршрутизации событий в распределённых системах.

23 июля в 20:00 — «Оптимальные решения на RabbitMQ: или как Кролик превосходит Kafka». Обсудим реальные сценарии, в которых RabbitMQ оказывается предпочтительнее Kafka, и покажем подходы к архитектуре с учётом производительности и надёжности.

Также доступен короткий тест к курсу «RabbitMQ для разработчиков и администраторов», который поможет оценить, насколько уверенно вы ориентируетесь в возможностях брокера и где стоит углубить знания.


ссылка на оригинал статьи https://habr.com/ru/articles/923204/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *