Обменник типа topic
Сообщения отправляемые обменнику типа topic должны иметь ключ маршрутизации, представляющий из себя набор слов разделённых точками. Слова могут быть любыми, но обычно они являются связанными с какой-нибудь особенностью сообщения. Вот примеры некоторых допустимых ключей маршрутизации: «stock.usd.nyse», «nyse.vmw», «quick.orange.rabbit». Ключ ограничен размером в 255 байт.
Ключ связывания должен быть указан похожим образом. Маршрутизация сообщений в обменнике типа topic похожа на маршрутизацию обменника типа direct — сообщение направляется в очередь с ключом связывания совпадающим с ключом маршрутизации. Однако есть два отличия:
- *(звёздочка) в ключе связывания может быть заменена только одним словом
- #(решётка) в ключе связывания может быть заменена нулём или более слов
Это можно проиллюстрировать следующим изображением:

(изображение взято с официального сайта RabbitMQ)
В этом примере мы будем отсылать сообщения которые представляют животных. Ключи маршрутизации сообщений состоят из трёх слов(и двух точек). Первое слово представляет скорость, второе — цвет, третье — вид.
В нашем примере с обменником связаны две очереди: Q1 с ключом связывания "*.orange.*" и Q2 с двумя ключами связывания "*.*.rabbit" и «lazy.#».
Эти связи будут означать следующую маршрутизацию:
- Q1 заинтересована во всех оранжевых (orange) животных
- Q2 заинтересована во всех кроликах (rabbit) и ленивых (lazy) животных
Сообщение с ключом маршрутизации «quick.orange.rabbit» будет доставлено в обе очереди. Сообщение с ключом «lazy.orange.elephant» также будет доставлено в обе.
Однако сообщение с ключом «quick.orange.fox» попадёт только в первую очередь, а с ключом «lazy.brown.fox» только во вторую. «lazy.pink.rabbit» будет доставлено во вторую очередь один раз, не смотря на то, что ключ маршрутизации совпадает с обоими ключами связывания.
«quick.brown.fox» не попадёт ни в одну очередь, так как ключ маршрутизации не совпадает ни с одним из ключей связывания. Если попытаться отослать сообщения с количеством слов в ключе маршрутизации меньшим или большим (например «orange» или «quick.orange.male.rabbit») чем в ключе связывание сообщение будет отброшено.
Обменник типа topic может вести себе как обменник типа fanout если в качестве ключа связывания указать #. Или как direct если в ключе связывания не указывать ни * ни #, а указать просто какое-нибудь слово.
Собираем код вместе
Будем использовать обменник типа topic для нашей системы логгирования. Начнём с предположения о том что наши ключи маршрутизации сообщений будут иметь вид «источник.строгость». Код будет почти такой же как в предыдущей части, вот send.php:
try { $producer = \MonsterMQ\Client\Producer(); $producer->connect('127.0.0.1', 5672); $producer->logIn('guest', 'guest'); $producer->newTopicExchange('topic-logs'); $routingKey = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; $message = implode(' ', array_slice($argv, 2)); $message = empty($message) ? "Hello World!" : $message; $producer->publish($message, $routingKey, 'topic-logs'); echo "\n Sent {$message} \n"; } catch(\Exception $e) { var_dump($e); }
Код worker-1.php
try { $consumer = \MonsterMQ\Client\Consumer(); $consumer->connect('127.0.0.1', 5672); $consumer->logIn('guest', 'guest'); $producer->queue('queue-1')->setExclusive()->declare(); $consumer->newTopicExchange('topic-logs'); $bindingKeys = array_slice($argv, 1); if (empty($bindingKeys)) { file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); exit(1); } foreach ($bindingKeys as $key) { $producer->queue('queue-1')->bind('topic-logs', $key); } $consumer->consume('queue-1'); echo " \n Waiting for logs. To exit press CTRL+C\n"; $consumer->wait(function ($message, $channelNumber) use ($consumer){ echo "\n $message \n"; }); } catch(\Exception $e) { var_dump($e); }
Код worker-2.php
try { $consumer = \MonsterMQ\Client\Consumer(); $consumer->connect('127.0.0.1', 5672); $consumer->logIn('guest', 'guest'); $producer->queue('queue-2')->setExclusive()->declare(); $consumer->newTopicExchange('topic-logs'); $bindingKeys = array_slice($argv, 1); if (empty($bindingKeys)) { file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); exit(1); } foreach ($bindingKeys as $key) { $producer->queue('queue-2')->bind('topic-logs', $key); } $consumer->consume('queue-2'); echo " \n Waiting for logs. To exit press CTRL+C\n"; $consumer->wait(function ($message, $channelNumber) use ($consumer){ echo "\n $message \n"; }); } catch(\Exception $e) { var_dump($e); }
Чтобы получать только critical сообщения первым воркером вызовите
php worker-1.php "*.critical"
Чтобы связать очередь, которую использует второй воркер, с обменником двумя ключами связывания вызовите:
php worker-2.php "kern.*" "*.critical"
Для отправки сообщения выполните что-нибудь вроде:
php send.php "kern.critical" "A critical kernel error"
Поэкспериментируйте с этими программами.
ссылка на оригинал статьи https://habr.com/ru/post/490678/
Добавить комментарий