RabbitMQ Direct Reply-to. RPC поверх кролика без дополнительных очередей (пример на Python)

от автора

Реализацией RPC запросов поверх брокеров сообщений никого не удивишь: очередь для запроса, очередь для ответа — ничего сложного.

Тот же RabbitMQ имеет пример в официальной документации. Других примеров там нет, поэтому создается впечатление, что отправка ответных сообщений в другую очередь — единственный возможный способ реализации RPC.

Этот сценарий отлично работает когда у нас есть непрерывный поток сообщений и непрерывный поток ответов на них. Однако, данный подход не применим в случаях, когда нам нужно отправить только одно сообщение и получить ответ именно на него. Мы сразу же попадаем в какой-то ад с фильтрацией ответов по correlation_id.

На самом деле, в RabbitMQ есть механизм и для такого сценария. Но он спрятан в недрах документации и о нем почти нет информации в интернете (особенно рабочих примеров кода).

Вот это недоразумение мы сейчас и исправим.

rpc

P.S: Здесь я не буду объяснять, кто такой этот ваш RabbitMQ и зачем он нужен: эту информацию вы можете найти в другой моей статье.

Direct Reply-TO

Работает все достаточно просто, за исключением некоторых нюансов, которые всплывают на практике.

Концепция заключается в следующем:

  • мы подписываемся на специальную псевдоочередь amqp.rabbitmq.reply-to
  • отправляем сообщение с указанием этой очереди в качестве reply-to заголовка
  • кролик генерирует для нас уникальный routing_key, по которому будет должно быть опубликовано ответное сообщение в default exchange
  • сервер получает наше сообщение и отправляет ответ по этому routing_key.

Нет нужды создавать какие-либо дополнительные очереди, нет дополнительных расходов на управление ими со стороны RMQ. Это абсолютно win-to-win механизм.

Алгоритм действий:

Со стороны клиента:

  • (СНАЧАЛА) подписываемся на очередь с волшебным названием amqp.rabbitmq.reply-to в no-ack режиме, объявлять ее не нужно
  • отправляем сообщение с указанием заголовка reply-to = amqp.rabbitmq.reply-to

Со стороны сервера:

  • получаем сообщение. В нем, в качестве reply-to заголовка будет нечто вида amqp.rabbitmq.reply-to.<uuid>
  • отправляем ответ в default exchange с reply-to значением в качестве ключа маршрутизации

На этом, в принципе, все. Однако каждое слово в этом алгоритме важно: сначала отправили, потом подписались — провал, попытались объявить очередь — провал, подписались в режиме ack — снова провал и т.д.

Поэтому мне пришлось потратить некоторое количество времени на написание рабочего кода. Давайте перейдем к нему, чтобы разобраться подробнее?

Python Example

Пример будет приведен с использование библиотеки aio-pika так как свою реализацию я писал имеенно на ней.

Пишем сервер

Сначала напишем некий бойлерплейт для подключения к очереди:

import asyncio from functools import partial import aio_pika  async def consumer(     msg: aio_pika.IncomingMessage,     channel: aio_pika.RobustChannelб ):     ...  async def main():     connection = await aio_pika.connect_robust(         "amqp://guest:guest@127.0.0.1/"     )      queue_name = "test"      async with connection:         channel = await connection.channel()         queue = await channel.declare_queue(queue_name)         # через partial прокидываем в наш обработчик сам канал         await queue.consume(partial(consumer, channel=channel))          try:             await asyncio.Future()         except Exception:             pass  asyncio.run(main())

А теперь перейдем к нашей функции-обработчику:

async def consumer(     msg: aio_pika.IncomingMessage,     channel: aio_pika.RobustChannel, ):     # используем контекстный менеджер для ack'а сообщения     async with msg.process():         print(msg.body)          # проверяем, требует ли сообщение ответа         if msg.reply_to:             # отправляем ответ в default exchange             await channel.default_exchange.publish(                 message=aio_pika.Message(                     body=b"hi!",                     correlation_id=msg.correlation_id,                 ),                 routing_key=msg.reply_to,  # самое важное             )

Как вы видите, действительно ничего сложного.

Пишем клиент

А вот тут будет немного веселья. Наша цель сделать такой же просто интерфейс как у requests:

data = requests.get("https://my-url.com").json()

Однако, это не так просто. Помните, что сначала нужно подписаться на ответную очередь? Так мы получаем следующий код:

import asyncio import aio_pika  RABBIT_REPLY = "amq.rabbitmq.reply-to"  async def consume_response(msg: aio_pika.IncomingMessage):     print(msg.body)  async def main():     connection = await aio_pika.connect_robust(         "amqp://guest:guest@127.0.0.1/"     )      async with connection:         channel = await connection.channel()          callback_queue = await channel.get_queue(RABBIT_REPLY)          # сначала подписываемся         consumer_tag = await callback_queue.consume(             callback=consume_response,             no_ack=True,  # еще один важный нюанс         )          # потом публикуем         await channel.default_exchange.publish(             message=aio_pika.Message(                 body=b"hello",                 reply_to=RABBIT_REPLY  # указываем очередь для ответа             ),             routing_key="test"         )  asyncio.run(main())

Так мы получаем ответное сообщение в нашу функцию-обработчик. Однако, теперь его нужно как-то достать оттуда. Для этого будем использовать asyncio.Queue.

import asyncio import aio_pika  RABBIT_REPLY = "amq.rabbitmq.reply-to"  async def main():     connection = await aio_pika.connect_robust(         "amqp://guest:guest@127.0.0.1/"     )      async with connection:         channel = await connection.channel()          callback_queue = await channel.get_queue(RABBIT_REPLY)          # создаем asyncio.Queue для ответа         rq = asyncio.Queue(maxsize=1)          # сначала подписываемся         consumer_tag = await callback_queue.consume(             callback=rq.put,  # помещаем сообщение в asyncio.Queue             no_ack=True,  # еще один важный нюанс         )          # потом публикуем         await channel.default_exchange.publish(             message=aio_pika.Message(                 body=b"hello",                 reply_to=RABBIT_REPLY  # указываем очередь для ответа             ),             routing_key="test"         )          # получаем ответ из asyncio.Queue         response = await rq.get()         print(response.body)          # освобождаем RABBIT_REPLY         await callback_queue.cancel(consumer_tag)  asyncio.run(main())

Теперь у нас уже есть что-то похожее на синхронный запрос-ответ. Можно немного поколдовать над интерфейсами и вы получите RPC over RMQ запрос, идентичный натуральному requests.

Вместо заключения

Ну а я уже поколдовал над этими интерфейсами. И вы можете увидеть результат этого колдовства в моем фреймворке Propan.

С его использование RPC запросы будут выглядеть для вас следующим образом:

from propan import PropanApp, RabbitBroker  broker = RabbitBroker("amqp://guest:guest@localhost:5672/") app = PropanApp(rabbit_broker)  # server side @broker.handle("ping") async def heartbeat():     return "pong"  @app.after_startup async def self_ping():     # client RPC request     response = await broker.publish(queue="ping", callback=True)     assert response == "pong"

И теперь вы точно знаете, что у них под капотом.

Нюансы

RabbitMQ Direct Reply-to действительно отличный механзим, однако и у него есть ограничения.

На псевдочередь amqp.rabbitmq.reply-to можно подписываться из разных сервисов неограниченное число раз одновременно, однако, если вы хотите отправить несколько разных запросов в рамках одного сервиса (одного connection, если быть точным) одновременно, у вас не получится это сделать: вы словите ошибку, что очередь уже имеет потребителя.

Поэтому в рамках одного сервиса необходимо использовать локи на отправку RPC запросов, что, к слову, также реализовано в Propan.

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
Знали ли вы об RMQ Direct Reply-to до этого?
76.92% Нет, первый раз слышу 10
7.69% Слышал 1
15.38% Конечно знаю и использую! 2
0% Кто такой этот ваш RabbitMQ? 0
Проголосовали 13 пользователей. Воздержался 1 пользователь.

ссылка на оригинал статьи https://habr.com/ru/articles/747644/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *