Использование RabbitMQ вместе с MonsterMQ часть 3

от автора

В нашей предыдущей статье мы создали очередь задач. Она предполагает что одна задача в виде сообщения доставляются одному получателю. В этой статье мы сделаем кое-что другое, мы будем отсылать одно сообщения сразу нескольким получателям. Мы создадим систему логгирования которая будет состоять из двух программ: первая будет отправлять сообщения, а вторая получать и выводить их. В нашей системе, все запущенные получатели будут получать сообщение отосланное отправителем.

Exchanges(обменники)

В предыдущих статьях мы отправляли и получали сообщение из очередей, теперь пришло время представить полную модель переправки сообщений в RabbitMQ.

Давайте быстро пройдёмся по тому что мы рассматривали в предыдущих статьях:

  • Producer — приложение которое посылает сообщения
  • Queue — буфер который хранит сообщения
  • Consumer — приложение которое получает и обрабатывает сообщения

Ключевая идея RabbitMQ (а точнее AMQP) состоит в том, что отправитель(Producer) никогда не отправляет сообщение напрямую в очередь. Более того он даже не знает попало ли оно в очередь. Вместо этого отправитель отсылает сообщения обменникам (Exchange). Обменник очень простая вещь, он делает две вещи: получает сообщение от отправителей и перенаправляет их в очередь. Обменники бывают разных типов: одни направляют сообщения в определённую очередь (тип direct), другие направляют одно и то же сообщения сразу в несколько очередей (тип fanout), третьи перенаправляют сообщения в очереди опираясь на определённые, заданные правила перенаправления (тип topic).

image
(изображение взято с официального сайта RabbitMQ)

Давайте рассмотрим обменник типа fanout. Для того чтобы его создать напишем следующий код:

$producer->newFanoutExchange('logs'); 

Fanout обменник работает по очень простой схеме, он просто отсылает получаемые сообщение всем привязанным к нему очередям.

Чтобы перечислить все обменники существующие на сервере вызовите rabbitmqctl

sudo rabbitmqctl list_exchanges

В этом списке будут amq.* обменники и стандартный безымянный (пустая строка) обменник. Не обращайте на них внимания они нам пока не нужны.

В прошлой статье мы ничего не знали об обменниках, но тем не менее были способны отправлять сообщения. Так происходило потому что если в MonsterMQ вы не указываете явно обменник как третий аргумент метода Producer::publish() библиотека будет использовать стандартный безымянный обменник RabbitMQ, который отсылает сообщения в очереди, имена которых переданы как второй аргумент методу Producer::publish().

Теперь мы можем отсылать сообщения нашему новому обменнику, указав его имя как третий аргумент метода Producer::publish():

$producer->publish($message, '', 'logs'); 

Теперь давайте создадим двух воркеров с двумя очередями, которые будут существовать до тех пор, пока воркеры, которые их объявили, активны. Сделать это можно следующим образом:

//Worker 1 $producer->queue('queue-1')->setExclusive()->declare(); 

//Worker 2 $producer->queue('queue-2')->setExclusive()->declare(); 

Теперь когда воркер, объявивший очередь, завершит сессию и отключится, она автоматически удалится. Если завершат работу оба воркера, удалятся обе очереди.

Связываем очередь с обменником

Мы уже создали обменник (exchange) и очередь (queue). Теперь нам осталось сказать обменнику, чтобы он отсылал полученные сообщения в наши очереди. Нам нужно связать (bind) обменник и очереди. Для этого напишем следующий код:

//Worker 1 $producer->queue('queue-1')->bind('logs'); 

//Worker 2 $producer->queue('queue-2')->bind('logs'); 

Отныне наш обменник logs связан с нашими очередями и будет переправлять полученные сообщения в них.

Вы можете перечислить все существующие связи следующей командой на linux

rabbitmqctl list_bindings

Соединяем код вместе

Наш скрипт отправителя претерпел не так много изменений по сравнению с предыдущим уроком. Главное отличие в том, что сейчас он отсылает сообщения, предварительно объявленному обменнику, а не стандартному (доступному из коробки). Вот код send.php

try {    $producer = \MonsterMQ\Client\Producer();     $producer->connect('127.0.0.1', 5672);    $producer->logIn('guest', 'guest');     $producer->newFanoutExchange('logs');     $message = implode(' ', array_slice($argv, 1));    $message = empty($message) ? 'Hello world!' : $message;     $producer->publish($message, '', 'logs');     echo "\n Sent {$message} \n"; } catch(\Exception $e) {    var_dump($e); } 

Стоит упомянуть, что послав сообщение обменнику, не связанному с какой-либо очередью, сообщение потеряется. Но сейчас это нас устраивает, так как мы ещё не запускали наших получателей.

Вот код нашего первого воркера worker-1.php

try {    $consumer = \MonsterMQ\Client\Consumer();     $consumer->connect('127.0.0.1', 5672);    $consumer->logIn('guest', 'guest');     $producer->queue('queue-1')->setExclusive()->declare();    $producer->queue('queue-1')->bind('logs');     $consumer->consume('queue-1');     $consumer->wait(function ($message, $channelNumber) use ($consumer){       echo "\n $message \n";    }); } catch(\Exception $e) {    var_dump($e); } 

Вот код второго воркера worker-2.php

try {    $consumer = \MonsterMQ\Client\Consumer();     $consumer->connect('127.0.0.1', 5672);    $consumer->logIn('guest', 'guest');     $producer->queue('queue-2')->setExclusive()->declare();    $producer->queue('queue-2')->bind('logs');     $consumer->consume('queue-2');     $consumer->wait(function ($message, $channelNumber) use ($consumer){       echo "\n $message \n";    }); } catch(\Exception $e) {    var_dump($e); } 

Если вы хотите сохранить сообщения отправленные первому воркеру в лог-файл просто вызовите следующую команду в консоли:

php worker-1.php > logs_from_rabbit.log

Чтобы запустить обоих воркеров и выводить сообщения в окна терминалов вызовите следующие команды, каждое в своём окне:

php worker-1.php

php worker-2.php

А чтобы отсылать сообщения вызовите следующую команду:

php send.php

В следующем уроке мы разберём как получать только определённое подмножество сообщений из всех отправляемых.

ссылка на оригинал статьи https://habr.com/ru/post/489692/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *