RabbitMQ на Rust: библиотека Lapin

от автора

Привет, Хабр!

Сегодня рассмотрим библиотеку Lapin в Rust. Lapin — это библиотека, реализующая протокол AMQP 0.9.1, она помогает взаимодействовать с RabbitMQ.

  1. Многоканальная работа: один TCP‑соединение поддерживает множество каналов.

  2. Поддержка подтверждений: безопасная обработка сообщений (ack/nack).

  3. Интеграция с async: асинхронный API, который вписывается в экосистему Rust.

  4. TLS: поддержка защищённых соединений через native-tls, openssl или rustls.


Основные сущности Lapin

Connection

Соединение с RabbitMQ устанавливается один раз и может использоваться для работы с несколькими каналами. Это основа любого взаимодействия.

Пример:

use lapin::{Connection, ConnectionProperties};  let addr = "amqp://user:password@localhost:5672/%2f"; let connection = Connection::connect(&addr, ConnectionProperties::default())     .await     .expect("Ошибка подключения");

Поддерживаются как TCP, так и TLS‑соединения.

Channel

Каналы — это основа взаимодействия с RabbitMQ. Через них создаются очереди, подписки и отправляются сообщения.

Пример:

let channel = connection.create_channel()     .await     .expect("Ошибка создания канала");

RabbitMQ рекомендует использовать отдельные каналы для отправки и получения сообщений. В одном соединении можно создавать множество каналов.

Queue

Очереди — это хранилища сообщений. Они бывают:

  • Durable: сохраняются при перезапуске сервера.

  • Exclusive: доступны только для текущего соединения.

  • Auto‑delete: удаляются, когда больше не используются.

Пример:

use lapin::options::QueueDeclareOptions; use lapin::types::FieldTable;  let queue = channel     .queue_declare(         "task_queue",         QueueDeclareOptions {             durable: true, // Устойчивая очередь             ..Default::default()         },         FieldTable::default(),     )     .await     .expect("Ошибка объявления очереди");

Используйте FieldTable, чтобы настроить TTL сообщений, ограничение размера очереди и т. п.

Exchange

Exchange — это маршрутизатор, который направляет сообщения в очереди на основе типов и правил маршрутизации:

  1. Direct: сообщения отправляются в конкретную очередь по ключу маршрутизации.

  2. Fanout: сообщение отправляется во все очереди, привязанные к exchange.

  3. Topic: сложная маршрутизация по шаблонам.

  4. Headers: маршрутизация на основе заголовков сообщения.

Пример настройки exchange и привязки к очереди:

use lapin::options::{ExchangeDeclareOptions, QueueBindOptions};  channel     .exchange_declare(         "my_exchange",         lapin::ExchangeKind::Direct, // Тип exchange         ExchangeDeclareOptions {             durable: true,             ..Default::default()         },         FieldTable::default(),     )     .await     .expect("Ошибка создания exchange");  channel     .queue_bind(         "task_queue",         "my_exchange",         "routing_key", // Ключ маршрутизации         QueueBindOptions::default(),         FieldTable::default(),     )     .await     .expect("Ошибка привязки очереди");

TLS:

Для продакшен‑окружения требуется защищённое соединение. Настроим соединение через rustls:

use lapin::{Connection, ConnectionProperties}; use lapin::tcp::{OwnedTLSConfig, OwnedTLSStream};  let addr = "amqps://user:password@rabbitmq.example.com:5671/"; let tls_config = OwnedTLSConfig::default();  let connection = Connection::connect(     addr,     ConnectionProperties::default().with_tls(tls_config), ) .await .expect("Ошибка подключения через TLS");

Обработка ошибок

Ошибки неизбежны.

  1. Используйте retry для повторного подключения или обработки сообщения.

  2. Обрабатывайте nack, если сообщение нельзя обработать.

Пример обработки ошибок:

while let Some(delivery) = consumer.next().await {     match delivery {         Ok(delivery) => {             if let Err(err) = process_message(&delivery).await {                 error!("Ошибка обработки сообщения: {:?}", err);                 delivery                     .nack(Default::default())                     .await                     .expect("Ошибка отправки nack");             } else {                 delivery                     .ack(Default::default())                     .await                     .expect("Ошибка отправки ack");             }         }         Err(err) => {             error!("Ошибка получения сообщения: {:?}", err);         }     } }

Пример применения

Напишем приложение, которое:

  • Создаёт exchange и очередь.

  • Маршрутизирует сообщения.

  • Обрабатывает входящие сообщения с QoS и подтверждениями.

use lapin::{     options::{BasicPublishOptions, QueueBindOptions, QueueDeclareOptions},     types::FieldTable,     BasicProperties, Connection, ConnectionProperties, }; use tokio; use tracing::info;  #[tokio::main] async fn main() {     tracing_subscriber::fmt::init();      let addr = "amqp://user:password@localhost:5672/%2f";     let connection = Connection::connect(&addr, ConnectionProperties::default())         .await         .expect("Ошибка подключения");      let channel = connection.create_channel().await.expect("Ошибка создания канала");      channel         .exchange_declare(             "logs",             lapin::ExchangeKind::Fanout,             Default::default(),             FieldTable::default(),         )         .await         .expect("Ошибка создания exchange");      let queue = channel         .queue_declare(             "",             QueueDeclareOptions {                 exclusive: true,                 ..Default::default()             },             FieldTable::default(),         )         .await         .expect("Ошибка объявления очереди");      channel         .queue_bind(             &queue.name(),             "logs",             "",             QueueBindOptions::default(),             FieldTable::default(),         )         .await         .expect("Ошибка привязки очереди");      info!("Очередь привязана к exchange");      tokio::spawn(async move {         for i in 0..5 {             let message = format!("Log message {}", i);             channel                 .basic_publish(                     "logs",                     "",                     BasicPublishOptions::default(),                     message.as_bytes(),                     BasicProperties::default(),                 )                 .await                 .expect("Ошибка отправки сообщения");         }     });      info!("Сообщения отправлены"); }

Создаём exchange типа Fanout, объявляем временную очередь и связываем её с exchange. После этого отправляем несколько сообщений в exchange, которые автоматически маршрутизируются во все привязанные очереди.


Подробнее с библиотекой можно ознакомиться здесь.

Рекомендую обратить внимание на открытые уроки, которые в феврале проведут в Otus преподаватели-практики:

  • 11 февраля: «Разбираем анатомию парсера на Rust».
    Разберём устройство игрушечного парсера на Rust, его ключевые компоненты и архитектурные принципы, обеспечивающие надёжность и производительность кода. Записаться

  • 17 февраля: «Инцидент-менеджмент в SRE — как быстро найти, устранить и предотвратить сбои в системе».
    Практическое руководство по эффективному управлению аварийными ситуациями в рамках Site Reliability Engineering (SRE). Записаться


ссылка на оригинал статьи https://habr.com/ru/articles/877014/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *