Привет, Хабр!
Сегодня разберём один из самых гибких инструментов в RabbitMQ — topic exchange. Именно он позволяет не просто отправить сообщение «куда-то», а превратить очередь в маршрутизатор уровня BGP, но только внутри твоей системы.
Что такое topic exchange и в каких архитектурах он нужен
Прежде чем начнём, зафиксируем один фундаментальный принцип: topic exchange — это шаблонный маршрутизатор. Он позволяет описать, кому доставить сообщение, используя гибкие шаблоны ключей, основанные на структуре из точек (.).
Пример ключа:
invoice.created.eu.highvalue
Т.е есть по фатку вместо того чтобы писать условие if region == "eu" and amount > 10000, ты просто биндишь очередь на шаблон:
invoice.created.eu.*
И все, логика переезжает из кода в инфраструктуру.
topic exchange полезен, когда вы строите микросервисную архитектуру с логической маршрутизацией, работаете в мультитенантной среде с разграничением по tenant ID в ключе, настраиваете CI/CD-пайплайны с шаблонными задачами вроде ci.build.java или ci.test.python, реализуете расширяемую систему алертов (alert.#) или ведёте распределённое логгирование (*.error.*). Главный плюс — масштабируемость: добавляйте новые сервисы, меняйте схему маршрутизации, не трогая код отправителей — всё управляется через биндинги.
Синтаксис и семантика
topic exchange работает с двумя спецсимволами:
-
*— один сегмент (одно слово) -
#— ноль или больше сегментов
Важно помнить:
|
routing key |
matched by |
|---|---|
|
|
|
|
|
|
|
|
|
Некоторые нюансы
-
#не может стоять внутри:a.#.b— недопустимо -
заменяет только один сегмент, не два:
a.b≠ -
routing key должен быть не пустым и без пробелов
Паттерн проектирования ключей
Один из лучших способов описания структуры routing key — это hierarchical domain model, где каждый сегмент описывает контекст:
<domain>.<event>.<scope>.<detail>
Примеры:
-
auth.login.success.user123 -
payment.failed.eu.customer456 -
order.created.us.ny
Такую структуру можно потом биндингами резать как хочешь:
-
auth.*.success.*— ловим успех в авторизации -
*.failed.#— ловим любые фейлы -
order.created.#— всё про создание заказов
Пример применения
Допустим, есть платформа, разрезанная на микросервисы. Каждый сервис отвечает за свой бизнес-контекст:
-
auth— аутентификация и авторизация -
orders— оформление и управление заказами -
payment— платежи и возвраты -
notifications— обработка писем и push-уведомлений
Вся система должна быть:
-
отказоустойчивой
-
наблюдаемой
-
расширяемой без редеплоя продюсеров
Проектируем событие в виде routing key, по следующей схеме:
<domain>.<event>.<level>.<region>.<tenant>
Примеры ключей:
auth.login.failed.error.eu.tenant42 payment.refund.processed.info.us.tenant17 orders.create.success.info.ru.tenant01 notifications.email.bounced.warn.eu.tenant42
Требования к системе доставки:
-
Все события
.error..*.*-> очередьcritical-ops(обработка ошибок, инциденты) -
Все события из
payment-> очередьfinance-pipeline -
Все
bouncedуведомления ->очередьemail-troubles -
Все события по
tenant42-> очередьaudit-tenant42 -
Все успешные события
.success.info..*-> очередьsuccess-feedдля ML-аналитики
Конфигурация exchange и биндингов
channel.exchange_declare( exchange="events_topic", exchange_type="topic", durable=True )
Биндинги:
|
Очередь |
Binding key |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Каждый из этих биндингов позволяет получать события по семантике, не переписывая отправителей. Добавили новый домен — биндим новую очередь. Расширили теги — не трогаем старый код.
Отправка событий
Реализация отправителя producer.py:
import pika def send_event(routing_key: str, body: str): connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.exchange_declare(exchange="events_topic", exchange_type="topic", durable=True) channel.basic_publish( exchange="events_topic", routing_key=routing_key, body=body.encode("utf-8"), properties=pika.BasicProperties( delivery_mode=2 # persistent ) ) print(f"Sent [{routing_key}]: {body}") connection.close() # Пример отправки события send_event("auth.login.failed.error.eu.tenant42", "Login failed from IP 1.2.3.4")
Обработка сообщений с ack, QoS и ручной контролем
Реализация обработчика ошибок (consumer_critical.py):
import pika def handle_critical(ch, method, properties, body): print(f"[CRITICAL] {method.routing_key}: {body.decode()}") # логика обработки, отправка в Slack, логгинг, мониторинг ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="critical-ops", durable=True) channel.queue_bind( exchange="events_topic", queue="critical-ops", routing_key="*.*.error.*.*" ) channel.basic_qos(prefetch_count=5) channel.basic_consume(queue="critical-ops", on_message_callback=handle_critical) print("Waiting for critical events...") channel.start_consuming()
prefetch_count=5 — позволяет контролировать, чтобы воркер не захлебнулся, если сообщение тяжёлое. basic_ack — обязательно нужен, чтобы сообщение не считалось обработанным, пока оно реально не пройдено. delivery_mode=2 — сообщение переживёт рестарт брокера. Очередь durable=True — не потеряет биндинги и сообщения при сбоях.
Поведение системы при множестве биндингов
Если событие с ключом payment.refund.failed.error.us.tenant42 отправлено:
-
попадёт в
critical-ops(совпадает с..error.*.*) -
попадёт в
finance-pipeline(совпадает сpayment.#) -
попадёт в
audit-tenant42(совпадает с#.*.*.*.tenant42)
Одно и то же сообщение будет скопировано RabbitMQ в несколько очередей по биндингу. Т.е продюсер делает один publish, а exchange делает мультикаст по правилам.
Как тестировать и отлаживать
Чтобы убедиться, что всё работает:
channel.queue_declare(queue="", exclusive=True) channel.queue_bind( exchange="events_topic", queue=result.method.queue, routing_key="#" )
Временно создаём анонимную очередь с шаблоном #, чтобы получить все события из exchange, и логируем каждый routing key вместе с телом сообщения и совпавшими биндингами.
По итогу построили систему, где:
-
routing key — это контракт маршрутизации
-
topic exchange — центр управления логикой
-
очереди — реализуют семантическую подписку без кода
-
продюсеры не зависят от инфраструктуры подписчиков
Подход сам по себе масштабируем и гибок.
Заключение
topic exchange — проверенный инструмент для построения маршрутизации событий в распределённых системах.
Если вы уже применяете topic exchange в своих проектах — расскажите об этом в комментариях: какие схемы вы проектировали, где ошибались, какие шаблоны оказались самыми удобными? И, конечно, в RabbitMQ есть и другие мощные механизмы: headers exchange для матчей по метаданным, consistent-hash exchange для балансировки, dead letter exchange для надёжной обработки ошибок и alternate exchange как fallback-механизм. Всё это может быть объединено в продуманную архитектуру, если понять, где что работает лучше.
Если вы работаете с микросервисами, интеграциями или хотите глубже понять, как управлять потоками сообщений — приглашаем вас на два открытых урока:
8 июля в 20:00 — «RabbitMQ: как заставить сообщения летать по сложным маршрутам». Разберёмся, как использовать topic exchange для гибкой маршрутизации событий в распределённых системах.
23 июля в 20:00 — «Оптимальные решения на RabbitMQ: или как Кролик превосходит Kafka». Обсудим реальные сценарии, в которых RabbitMQ оказывается предпочтительнее Kafka, и покажем подходы к архитектуре с учётом производительности и надёжности.
Также доступен короткий тест к курсу «RabbitMQ для разработчиков и администраторов», который поможет оценить, насколько уверенно вы ориентируетесь в возможностях брокера и где стоит углубить знания.
ссылка на оригинал статьи https://habr.com/ru/articles/923204/
Добавить комментарий