RabbitMQ tutorial 3 — Публикация/Подписка

от автора

Хочу продолжить серию перевода уроков с официального сайта. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП.

Публикация/Подписка

В предыдущей статье было рассмотрено создание рабочей очереди сообщений. Было сделано допущение, что каждое сообщение будет направлено одному обработчику(worker). В этой статье усложним задачу – отправим сообщение нескольким подписчикам. Этот паттерн известен как "publish/subscribe" (публикация/подписка).
Чтобы понять этот шаблон, создадим простую систему логирования. Она будет состоять из двух программ – первая будет создавать логи, вторая считывать и печатать их.
В нашей систему логирования каждая программа подписчик будет получать каждое сообщение. Благодаря этому, мы сможем запустить одного подписчика на сохранение логов на диск, а потом в любое время сможем создать другого подписчика для отображения логов на экран.
По существу, каждое сообщение будет транслироваться каждому подписчику.

Точки обмена(exchanges)

В предыдущих статьях для отправки и принятия сообщений мы работали с очередью. Теперь рассмотрим расширенную модель отправки сообщений Rabbit.
Напомним термины предыдущей статьи:

  • Producer (поставщик) ‒ программа, отправляющая сообщения
  • Queue (очередь) – буффер, хранящий сообщение
  • Consumer (подписчик) ‒ программа, принимающая сообщения.

Основная идея в модели отправки сообщений Rabbit – Поставщик(producer) никогда не отправляет сообщения напрямую в очередь. Фактически, довольно часто поставщик не знает, дошло ли его сообщение до конкретной очереди.
Вместо этого поставщик отправляет сообщение в точку доступа. В точке доступа нет ничего сложного. Точка доступа выполняет две функции:
— получает сообщения от поставщика
— отправляет эти сообщения в очередь.
Точка доступа точно знает, что делать с поступившими сообщениями. Отправить сообщение в конкретную очередь, либо в несколько очередей, либо не отправлять никому и удалить его. Эти правила описываются в типе точки доступа (exchange type).

image

Существуют несколько типов: direct, topic, headers и fanout. Мы остановимся на последнем типе fanout. Создадим точку с доступа с этим типом и назовем её – logs:

$channel->exchange_declare('logs', 'fanout', false, false, false); 

Тип fanout – очень прост. Он копирует все сообщения которые поступают к нему во все очереди, которые ему доступны. Это то что нам нужно для нашей системы логирования.

Просмотр списка точек доступа:

Чтобы посмотреть все точки доступа на сервере, необходимо выполнить команду rabbitmqctl:

$ sudo rabbitmqctl list_exchanges Listing exchanges ...         direct amq.direct      direct amq.fanout      fanout amq.headers     headers amq.match       headers amq.rabbitmq.log        topic amq.rabbitmq.trace      topic amq.topic       topic logs    fanout ...done. 

Мы видим список точек доступа с наименованием amq.* и точку доступа без имени, которая используется по умолчанию (она не подходит для выполнения нашей задачи).

Наименование точек доступа.

В предыдущих статьях мы ничего не знали о точках доступа, но всё-таки могли отправлять письма в очередь. Это было возможно, потому что использовали точку доступа по умолчанию, которая идентифицируется пустой строкой “”.
Вспомним как раньше мы отправляли письма:

$channel->basic_publish($msg, '', 'hello'); 

Здесь используется точка доступа по умолчанию или безымянная точка доступа: сообщение направляется в очередь, идентифицированную через ключ “routing_key”. Ключ “routing_key” передается через третий параметр функции basic_publish.

Теперь мы можем отправить сообщение в нашу именованную точку доступа.

$channel->exchange_declare('logs', 'fanout', false, false, false); $channel->basic_publish($msg, 'logs');  

Временные очереди:

Всё это время мы использовали наименование очередей (“hello“ или “task_queue”). Возможность давать наименования помогает указать обработчикам (workers) определенную очередь, а также делить очередь между продюсерами и подписчиками.
Но наша система логирования требует, чтобы в очередь поступали все сообщения, а не только часть. Также мы хотим, чтобы сообщения были актуальными, а не старыми. Для этого нам понадобиться 2 вещи:
Каждый раз когда мы соединяемся с Rabbit, мы создаем новую очередь, или даем создать серверу случайное наименование.
Каждый раз когда подписчик отключается от Rabbit, мы удаляем очередь.
В php-amqplib клиенте, когда мы обращаемся к очереди без наименовании, мы создаем временную очередь и автоматически сгенерированным наименованием:

list($queue_name, ,) = $channel->queue_declare(""); 

Метод вернет автоматически сгенерированное имя очереди. Она может быть такой – ‘amq.gen-JzTY20BRgKO-HjmUJj0wLg.’.
Когда заявленное соединение оборвется, очередь автоматически удалиться.

Переплеты(Bindings)

image

Итак, у нас есть точка доступа с типом fanout и очередь. Сейчас нам нужно сказать точке доступа, чтобы она отправила сообщение в очередь. Отношение между точкой доступа и очередью называется bindings.

$channel->queue_bind($queue_name, 'logs'); 

С этого момента, сообщения для нашей очереди проходят через точку доступа
Посмотреть список binding-ов можно используя команду rabbitmqctl list_bindings

Отправка во все очереди:

image
Программа продюсер, которая создает сообщения, не изменилась с предыдущей статьи. Единственное важное отличие – теперь мы направляем сообщения в нашу именованную точку доступа ‘logs’, вместо точки доступа по умолчанию. Нам нужно было указать имя очереди при отправки сообщения. Но для точки доступа с типом fanout в этом нет необходимости. Рассмотрим код скрипта emit_log.php:

<?php  require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage;  $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();  $channel->exchange_declare('logs', 'fanout', false, false, false);  $data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "info: Hello World!"; $msg = new AMQPMessage($data);  $channel->basic_publish($msg, 'logs');  echo " [x] Sent ", $data, "\n";  $channel->close(); $connection->close();  ?>  

(emit_log.php source)

Как вы видите, после установки соединения мы создаем точку доступа. Этот шаг необходим, так как использование несуществующей точки доступа – запрещено.
Сообщение в точке доступа будут потеряны, так как ни одна очередь не связана с точкой доступа. Но это хорошо для нас: пока нет ни одного подписчика нашей точки доступа, все сообщения могут безопасно удалятся.
Код подписчика receive_logs.php:

<?php  require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection;  $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();  $channel->exchange_declare('logs', 'fanout', false, false, false);  list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);  $channel->queue_bind($queue_name, 'logs');  echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";  $callback = function($msg){   echo ' [x] ', $msg->body, "\n"; };  $channel->basic_consume($queue_name, '', false, true, false, false, $callback);  while(count($channel->callbacks)) {     $channel->wait(); }  $channel->close(); $connection->close();  ?>  

(receive_logs.php source)

Если вы хотите сохранить логи в файл, вам потребуется открыть консоль и набрать:

$ php receive_logs.php > logs_from_rabbit.log

Если вы хотите отобразить логи на экран, откройте еще одно окно и наберите:
$ php receive_logs.php

Ну и конечно запуск продюсера сообщений:
$ php emit_log.php

С помощью команды rabbitmqctl list_bindings мы можем удостовериться, что код правильно создал очередь и связал её с точкой доступа. С двумя открытыми программами receive_logs.php у вас должно получиться следующее:

$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.

Здесь отображено, что данные точки доступа logs отправляются в две очереди, имена которых созданы автоматически. Это именно то, чего мы добивались.
В следующей статье будет описано, как прослушать только часть сообщений.

ссылка на оригинал статьи http://habrahabr.ru/post/200870/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *