RabbitMQ. Часть 3. Разбираемся с Queues и Bindings

от автора

Queue (очередь) — структура данных на диске или в оперативной памяти, которая хранит ссылки на сообщения и отдает их копии consumers (потребителям). Queue представляет собой Erlang-процесс с состоянием (где могут кэшироваться и сами сообщения). 1 тысяча очередей может занимать порядка 80Mb.

Binding (привязка) — правило, которое сообщает обменнику в какую из очередей должны попадать сообщения.

Оглавление

  • RabbitMQ. Часть 1. Introduction. Erlang, AMQP и RPC
  • RabbitMQ. Часть 2. Разбираемся с Exchanges
  • RabbitMQ. Часть 3. Разбираемся с Queues и Bindings
  • RabbitMQ. Часть 4. Разбираемся с тем что-такое сообщения и фреймы
  • RabbitMQ. Часть 5. Производительность публикации и потребления сообщений
  • RabbitMQ. Часть 6. Обзор модулей Federation и Shovel
  • RabbitMQ. Часть 7. Подробно про Connection и Chanel
  • RabbitMQ. Часть 8. RabbitMQ в .NET
  • RabbitMQ. Часть 9. Мониторинг

Временные очереди

Если создание очереди происходит с установленным параметром autoDelete, то такая очередь обретает способность автоматически удалять себя. Такие очереди обычно создаются в момент подключения первого клиента и удаляются в момент, когда все клиенты отсоединились.

Если создание очереди происходит с установленным параметром exclusive, то такая очередь разрешает подключаться только одному потребителю и удаляется если закроется канал. До тех пор пока канал не закроется, клиент может отключаться/подключаться, но только в рамках того же самого соединения. Если параметр exclusive установлен, то параметр autoDelete не имеет никакого эффекта.

Особенности:

  • при кратковременном разрыве связи мы будем терять сообщения, которые ещё не успели дойти до потребителя
  • можно поймать феномен binding churn. Феномен возникает, когда количество операций по созданию/удалению очередей и привязок достигает очень больших значений. В кластерном режиме такой поток операций будет расползаться по всем узлам и создаст большую нагрузку. Данный процесс можно оптимизировать за счет контроля количества подписок

Постоянные очереди

Если создание очереди происходит с установленным параметром durable, то такая очередь сохраняет свое состояние и восстанавливается после перезапуска сервера/брокера. Данная очередь будет существовать до тех пор пока не будет вызвана команду Queue.Delete.

Highly Available очереди

Очереди HA требуют кластерной среды RabbitMQ. В кластерном режиме вся информация об обменниках, очередях, привязках и потребителях будет скопирована на все узлы.

Когда сообщение публикуется в какую-то HA очередь, оно хранится на каждом узле, относящемуся к HA очереди. После того как сообщение потребляется на каком-то из узлов, все копии этого сообщения будут удалены на других узлах.

Очереди HA могут распространяться на все узлы в некотором кластере или только на индивидуальные.

rabbitmq_26

Особенности:

  • использование HA очередей приводит к наказаниям в производительности. При помещении сообщения в некую HA очередь или при потреблении сообщения из HA очереди RabbitMQ должен выполнять координацию по всем узлам (2-3 узла обычно достаточно)

Создание очереди

Создание очереди происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Queue.Declare, вызываемого с параметрами:

  • название очереди
  • другие параметры

Пример создания очереди при помощи RabbitMQ.Client:

// ... channel.QueueDeclare(     queue: "my_queue",     durable: false,     exclusive: false,     autoDelete: false,     arguments: null ); // ...

  • queue — название очереди, которую мы хотим создать. Название должно быть уникальным и не может совпадать с системным именем очереди
  • durable — если true, то очередь будет сохранять свое состояние и восстанавливается после перезапуска сервера/брокера
  • exclusive — если true, то очередь будет разрешать подключаться только одному потребителю
  • autoDelete — если true, то очередь обретает способность автоматически удалять себя
  • arguments — необязательные аргументы. Ниже разберем подробнее.

arguments

  • x-message-ttl(x-message-time-to-live) — позволяет установить время истечения срока жизни сообщения в миллисекундах. Если создание очереди происходит с установленным значением аргумента x-message-ttl, то такая очередь будет автоматически исключать сообщения, у которых истек срок действия. Установка значения аргумента x-message-ttl задает максимальный возраст для всех сообщений в данной очереди. Создание такой очереди позволяет предотвратить получение устаревшей информации. Это можно использовать в системах реального времени. Если у очереди для которой задан обменник для отклоненных сообщений установить значение аргумента x-message-ttl, то отклоненные сообщения в данной очереди начнут обладать сроком жизни.
  • x-expires — задает значение в миллисекундах по истечению которого происходит удаление очереди. Очередь может израсходовать срок своего действия только если она не имеет никаких подписчиков. Если к очереди подключены подписчики, она сможет автоматически удалиться только тогда, когда все подписчики вызовут Basic.Cancel или отсоединятся. Срок жизни очереди может завершиться только в том случае, если к ней не было запроса Basic.Get. Иначе текущее значение установки времени жизни обнуляется, и очередь больше не будет автоматически удаляться. Также нет гарантий того, насколько быстро происходит удаление очереди после истечения её срока жизни.
  • x-max-length — задает максимальное число сообщений в очереди. Если число сообщений в очереди начинает превышать макимальное чило, то начинают удаляться самые старые

rabbitmq_25

  • x-max-lenght-bytes — задает максимально допустимый суммарный размер полезной нагрузки сообщений в очереди. При превышении установленного значения (возникло переполнение очереди при очередной публикации сообщения) самые старые сообщения начнут удаляться
  • x-overflow — данный аргумент используется для настройки поведения в результате переполнения очереди. Доступны два значения: drop-head (значение по умолчанию) и reject-publish. Если выбрать drop-head, то самые старые сообщения будут удаляться. Если выбрать reject-publish, то прием сообщений будет приостановлен
  • x-dead-letter-exchange — задает exchange, в который направляются отвергнутые сообщения, которые не поставлены повторно в очередь
  • x-dead-letter-routing-key — задает не обязательный ключ маршрутизации для отвергнутых сообщений
  • x-max-priority — разрешает сортировку по приоритетам в очереди с максимальным значением приоритета 255 (RabbitMQ версий 3.5.0 и выше). Число указывает максимальный приоритет, который будет поддерживать очередь. Если аргумент не установлен, очередь не будет поддерживать приоритет сообщений
  • x-queue-mode — позволяет перевести очередь в ленивый режим. В таком режиме как можно больше сообщений будет храниться на диске. Использование оперативной памяти будет минимально. В случае, если он не установлен, очередь будет хранить сообщения в памяти, чтобы доставлять сообщения максимально быстро
  • x-queue-master-locator — если у нас кластер, то можно задать мастер очередь
  • x-ha-policy — используется при создании HA очередей и определяет как сообщение будет распространяться по узлам. Если установлено значение all, то сообщение будет сохраняться на всех узлах. Если установлено значение nodes, то сообщение будет сохраняться на определенных узлах кластера
  • x-ha-nodes — задает узлы, к которым будет относиться некая очередь HA

rabbitmq_10

Если создание очереди возможно, то сервер отправит клиенту синхронный RPC ответ Queue.DeclareOk. Если создание очереди невозможно (произошел отказ по запросу Queue.Declare), то канал закроется сервером при помощи команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки и ее описание.

Повторный вызов Queue.Declare с аналогичными параметрами вернет полезную информацию об этой очереди. Например, общее число сообщений, находящихся в ожидании в данной очереди, и общее число подписанных на неё потребителей.

Вызов Queue.Declare под учетными данными пользователя, которому не назначены необходимые права закроет канал при помощи команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки 403 и ее описание.

После того, как очередь простаивает в течении >= 10 секунд, она впадает в спящий режим, вызывая GC в очереди, что приводит к значительному сокращению памяти, необходимой для этой очереди.

Создание Queue через графический интерфейс

Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Queues и нажмем на Add a new queue. Заполняем свойства:

rabbitmq_21

Как только мы введем все необходимые данные и нажмете на Add queues, очередь появится в общем списке.

rabbitmq_22

Щелчок по имени очереди покажет ее детальную информацию. Здесь можно настроить привязку между обменом и очередью, посмотреть список consumers, опубликовать/получить сообщения, удалить очередь и посмотреть статистику.

Создание Binding

Создание привязки происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Queue.Bind, вызываемого с параметрами:

  • название очереди
  • название точки обмена
  • другие параметры

Пример создания привязки при помощи RabbitMQ.Client:

//... channel.QueueBind(     queue: queueName,     exchange: "my_exchange",     routingKey: "my_key",     arguments: null ); //...

  • queue — имя очереди
  • exchange — имя обменника
  • routingKey — ключ маршрутизации
  • arguments — необязательные аргументы

rabbitmq_11

Если создание привязки возможно, то сервер отправит клиенту синхронный RPC ответ Queue.BindOk.

Создание Binding через графический интерфейс

Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Queues и жмем на очередь my_queue. Заполняем поля раздела bindings:

rabbitmq_23

Как только мы введем все необходимые данные и нажмем на Bind, привязка отобразится в общем списке:

rabbitmq_24

Code

В данном разделе опишем очередь и привязку кодом на C#, так если бы нам требовалось разработать библиотеку. Возможно это будет полезно для восприятия.

public interface IQueue     {                 string Name { get; }          /// <summary>         ///     Если установить true, то queue будет являться постоянным.          ///     Она будет храниться на диске и сможет          ///     пережить перезапуск сервера/брокера.          ///     Если значение false, то queue является временной и будет удаляться,          ///     когда сервер/брокер будет перезагружен         /// </summary>         bool IsDurable { get; }          /// <summary>         ///     Если значение равно true, то          ///     такая очередь будет разрешать подключаться          ///     только одному consumer-у         /// </summary>         bool IsExclusive { get; }          /// <summary>         ///     Автоматическое удаление.          ///     Очередь будет удалена, когда все клиенты отсоединятся.         /// </summary>         bool IsAutoDelete { get; }          /// <summary>         ///     Необязательные аргументы         /// </summary>         IDictionary<string, object> Arguments { get; }     }

public class Queue : IQueue     {         public Queue(              string name,               bool isDurable = true,               bool isExclusive = false,               bool isAutoDelete = false,               IDictionary<string, object> arguments = null)         {             Name = name ??                 throw new ArgumentNullException(name, $"{name} must not be null");              IsDurable = isDurable;             IsExclusive = isExclusive;             IsAutoDelete = isAutoDelete;             Arguments = arguments ?? new Dictionary<string, object>();         }          public string Name { get; }         public bool IsDurable { get; }         public bool IsExclusive { get; }         public bool IsAutoDelete { get; }         public IDictionary<string, object> Arguments { get; }     }

public static class QueueMode     {                public const string Default = "default";         /// <summary>         ///     Ленивый режим. Ленивый режим заставит сохранять          ///     как можно больше сообщений на диске, чтобы уменьшить          ///     использование оперативной памяти         /// </summary>         public const string Lazy = "lazy";     }

public interface IBinding     {         /// <summary>         ///     Обменник, который будет связываться привязкой         /// </summary>         IExchange Exchange { get; }          /// <summary>         ///     Ключ маршрутизации         /// </summary>         string RoutingKey { get; }          /// <summary>         ///     Необязательные аргументы         /// </summary>         IDictionary<string, object> Arguments { get; }     }

public class Binding : IBinding     {         public Binding(              IExchange exchange,               string routingKey,               IDictionary<string, object> arguments)         {             Exchange = exchange;             RoutingKey = routingKey;             Arguments = arguments;         }          public IExchange Exchange { get; }         public string RoutingKey { get; }         public IDictionary<string, object> Arguments { get; }     }

ссылка на оригинал статьи https://habr.com/ru/post/490960/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *