Exchanges(обменники)
В предыдущих статьях мы отправляли и получали сообщение из очередей, теперь пришло время представить полную модель переправки сообщений в RabbitMQ.
Давайте быстро пройдёмся по тому что мы рассматривали в предыдущих статьях:
- Producer — приложение которое посылает сообщения
- Queue — буфер который хранит сообщения
- Consumer — приложение которое получает и обрабатывает сообщения
Ключевая идея RabbitMQ (а точнее AMQP) состоит в том, что отправитель(Producer) никогда не отправляет сообщение напрямую в очередь. Более того он даже не знает попало ли оно в очередь. Вместо этого отправитель отсылает сообщения обменникам (Exchange). Обменник очень простая вещь, он делает две вещи: получает сообщение от отправителей и перенаправляет их в очередь. Обменники бывают разных типов: одни направляют сообщения в определённую очередь (тип direct), другие направляют одно и то же сообщения сразу в несколько очередей (тип fanout), третьи перенаправляют сообщения в очереди опираясь на определённые, заданные правила перенаправления (тип topic).
(изображение взято с официального сайта RabbitMQ)
Давайте рассмотрим обменник типа fanout. Для того чтобы его создать напишем следующий код:
$producer->newFanoutExchange('logs');
Fanout обменник работает по очень простой схеме, он просто отсылает получаемые сообщение всем привязанным к нему очередям.
Чтобы перечислить все обменники существующие на сервере вызовите rabbitmqctl
sudo rabbitmqctl list_exchanges
В этом списке будут amq.* обменники и стандартный безымянный (пустая строка) обменник. Не обращайте на них внимания они нам пока не нужны.
В прошлой статье мы ничего не знали об обменниках, но тем не менее были способны отправлять сообщения. Так происходило потому что если в MonsterMQ вы не указываете явно обменник как третий аргумент метода Producer::publish() библиотека будет использовать стандартный безымянный обменник RabbitMQ, который отсылает сообщения в очереди, имена которых переданы как второй аргумент методу Producer::publish().
Теперь мы можем отсылать сообщения нашему новому обменнику, указав его имя как третий аргумент метода Producer::publish():
$producer->publish($message, '', 'logs');
Теперь давайте создадим двух воркеров с двумя очередями, которые будут существовать до тех пор, пока воркеры, которые их объявили, активны. Сделать это можно следующим образом:
//Worker 1 $producer->queue('queue-1')->setExclusive()->declare();
//Worker 2 $producer->queue('queue-2')->setExclusive()->declare();
Теперь когда воркер, объявивший очередь, завершит сессию и отключится, она автоматически удалится. Если завершат работу оба воркера, удалятся обе очереди.
Связываем очередь с обменником
Мы уже создали обменник (exchange) и очередь (queue). Теперь нам осталось сказать обменнику, чтобы он отсылал полученные сообщения в наши очереди. Нам нужно связать (bind) обменник и очереди. Для этого напишем следующий код:
//Worker 1 $producer->queue('queue-1')->bind('logs');
//Worker 2 $producer->queue('queue-2')->bind('logs');
Отныне наш обменник logs связан с нашими очередями и будет переправлять полученные сообщения в них.
Вы можете перечислить все существующие связи следующей командой на linux
rabbitmqctl list_bindings
Соединяем код вместе
Наш скрипт отправителя претерпел не так много изменений по сравнению с предыдущим уроком. Главное отличие в том, что сейчас он отсылает сообщения, предварительно объявленному обменнику, а не стандартному (доступному из коробки). Вот код send.php
try { $producer = \MonsterMQ\Client\Producer(); $producer->connect('127.0.0.1', 5672); $producer->logIn('guest', 'guest'); $producer->newFanoutExchange('logs'); $message = implode(' ', array_slice($argv, 1)); $message = empty($message) ? 'Hello world!' : $message; $producer->publish($message, '', 'logs'); echo "\n Sent {$message} \n"; } catch(\Exception $e) { var_dump($e); }
Стоит упомянуть, что послав сообщение обменнику, не связанному с какой-либо очередью, сообщение потеряется. Но сейчас это нас устраивает, так как мы ещё не запускали наших получателей.
Вот код нашего первого воркера worker-1.php
try { $consumer = \MonsterMQ\Client\Consumer(); $consumer->connect('127.0.0.1', 5672); $consumer->logIn('guest', 'guest'); $producer->queue('queue-1')->setExclusive()->declare(); $producer->queue('queue-1')->bind('logs'); $consumer->consume('queue-1'); $consumer->wait(function ($message, $channelNumber) use ($consumer){ echo "\n $message \n"; }); } catch(\Exception $e) { var_dump($e); }
Вот код второго воркера worker-2.php
try { $consumer = \MonsterMQ\Client\Consumer(); $consumer->connect('127.0.0.1', 5672); $consumer->logIn('guest', 'guest'); $producer->queue('queue-2')->setExclusive()->declare(); $producer->queue('queue-2')->bind('logs'); $consumer->consume('queue-2'); $consumer->wait(function ($message, $channelNumber) use ($consumer){ echo "\n $message \n"; }); } catch(\Exception $e) { var_dump($e); }
Если вы хотите сохранить сообщения отправленные первому воркеру в лог-файл просто вызовите следующую команду в консоли:
php worker-1.php > logs_from_rabbit.log
Чтобы запустить обоих воркеров и выводить сообщения в окна терминалов вызовите следующие команды, каждое в своём окне:
php worker-1.php
php worker-2.php
А чтобы отсылать сообщения вызовите следующую команду:
php send.php
В следующем уроке мы разберём как получать только определённое подмножество сообщений из всех отправляемых.
ссылка на оригинал статьи https://habr.com/ru/post/489692/
Добавить комментарий