Дружим RabbitMQ и Flutter/Dart

от автора

Привет, Хабр! Я Юрий Петров, руководитель отдела мобильной разработки в Friflex и автор телеграм-канала «Мобильный разработчик».
В этой статье хотел бы поделиться с вами опытом работы с брокером сообщений RabbitMQ из Dart-кода. 

Разберу вопросы:

  1. Как установить и запустить контейнер с RabbitMQ

  2. Как настроить RabbitMQ.

  3. Как создать Producer (отправителя) на Flutter.

  4. Как создать Consumer (потребителя) на Flutter.

Давайте представим, что вы пишете маленький сервис на Dart, задача которого — отправить миллион сообщений. Не важно, каким способом — с помощью пушей или любого другого сервиса доставки. Но есть условие, что сообщения должны отправляться не все сразу, а по некоторому алгоритму.  Например, ровно в 12 дня или, если у пользователя ночь, то только в дневное время. На самом деле не важен алгоритм, главное — понять, что у нас есть некая задержка отправки сообщений.

Есть простой способ это организовать. Например, мы будем использовать для пула сообщений задержку:

await Future.delayed(Duration(hours: 2));

По окончании задержки система отправит сообщения через два часа после получения. И в целом это будет работать, но это не самый лучший способ.  Просто представьте, что таких сообщений может быть миллион, или, например, задержку отправки необходимо сделать на 10 дней, пока пользователь в отпуске. Понятно, что это не самый эффективный вариант.

Для решения таких проблем программисты придумали планировщики задач. Их называют брокеры сообщений, самый подходящий для нас — это RabbitMQ, так как у него есть нативный плагин Delayed Message Exchange, который отлично подходит для выполнения задач через установленное время. 

Как работает сам брокер и что это вообще такое, я рассказывать не буду, так как это отдельная тема. Но вы можете почитать, например, здесь.

Для решения этой задачи, будем использовать следующие понятия:

Producer (Отправитель): приложение, которое будет отправлять сообщение в брокер. Сообщение должно быть доставлено потребителю не сразу, а  через пять секунд.

Broker: сервис, который получает сообщение и хранит его в очереди  до момента, когда нужно его отправить. Используем RabbitMQ.

Consumer (Потребитель): приложение, которое слушает очередь в брокере, получает готовые к доставке сообщения и отображает их.

Установка и запуск контейнера с RabbitMQ

И первое, с чего мы начнем, это запустим сервис RabbitMQ как докер-контейнер. Но сначала вы должны у себя установить Docker Desktop, перейти на сайт и выбрать необходимую сборку. Делается это очень просто, я думаю, труда у вас это не составит.

После того как установили docker в систему, проверяем в терминале командой:

docker --version

Вывод в консоль должен быть примерно такой:

Docker version 27.4.0, build bde2b89.

После того как разобрались с докером, нам необходимо запустить контейнер с RabbitMQ. И здесь у вас есть выбор. Вы можете использовать оригинальный контейнер, запустив команду в терминале, которая автоматически скачает и развернет докер-контейнер. И далее самим установить плагин Delayed Message Exchange.

docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management

И можно использовать готовый образ с уже установленным плагином.

docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:4.0.2-management

Обе эти команды запускают контейнер с RabbitMQ, используя нужный докер-образ с параметрами:

-d: Флаг -d (или --detach) указывает Docker запускать контейнер в фоновом режиме, не блокируя терминал. Контейнер работает как отдельный процесс.
--name my-rabbit: Этот параметр задает имя для контейнера.

-p 5672:5672 -p 15672:15672: Пробрасывает порты между хост-машиной и контейнером.

После успешного выполнения команды проверьте, запущен ли у вас контейнер командой:

docker ps

В терминале вы должны увидеть запущенный контейнер в колонке NAMES.

Настройка RabbitMQ

Теперь нам необходимо настроить RabbitMQ. Для это переходим на веб-интерфейс — вводим  адрес в браузере: http://localhost:15672/

Вы должны увидеть окно для ввода логина и пароля.

Веб интерфейс RabbitMQ — ввод логина и пароля

Вводим данные по умолчанию: username — guest и password — guest.

Попадаем на главное окно:

Веб интерфейс RabbitMQ — главное окно

Далее переходим во вкладку Exchanges и добавляем новый обменник.

Заполняем поля:

  •  Name: delayed_exchange.

  •  Type: выбираем x-delayed-message. Если в списке нет x-delayed-message, значит, плагин не установлен — проверьте, что все установлено и включено.

  • Arguments: добавляем ключ: 

    • key: x-delayed-type

    • value: direct.

Веб интерфейс RabbitMQ — настройки Exchanges

Нажимаем на кнопку Add exchange. Убеждаемся, что в списке появился delayed_exchange.

Веб интерфейс RabbitMQ — проверка

Осталось настроить очередь для сообщений. Для этого переходим на вкладку Queues.

Добавляем новую очередь. Заполняем поля:

  • Name: messages_queue.

  • Durability: Durable.

Нажимаем Add queue.

Веб интерфейс RabbitMQ — настройки Queues

Отлично, у нас есть настроенный exchange и queue. Осталось их соединить.

Для этого переходим в созданную очередь.

Веб интерфейс RabbitMQ — переход на Queues

Далее на странице очереди messages_queue находим секцию Bindings.

Заполняем поля:

  • From exchange: delayed_exchange

  • Routing key: messages_queue

Нажимаем кнопку Bind.

Веб интерфейс RabbitMQ — пример связки

Таким образом, когда мы публикуем в delayed_exchange, сообщение придет в messages_queue, после того как истечет задержка x-delay.

Если вам не нравится работать в веб-интерфейсе, или вы хотели бы этот процесс автоматизировать, можно это все сделать через терминал командами:

Подключаемся к контейнеру my-rabbit:

docker exec -it my-rabbit bash

Объявляем exchange внутри контейнера:

rabbitmqadmin declare exchange \     name=delayed_exchange \     type=x-delayed-message \     arguments='{"x-delayed-type":"direct"}'

Создаем очередь:

rabbitmqadmin declare queue name=messages_queue durable=true

Создаем связку:

rabbitmqadmin declare binding source=delayed_exchange destination=messages_queue routing_key="messages_queue"

Все готово, теперь нам необходимо реализовать Producer на Flutter.

Создание Producer (отправителя) на Flutter

Для реализации отправителя, создадим простой проект на Flutter с одной кнопкой, которая будет отправлять сообщение с нужной нам задержкой. Для работы с брокером будем использовать библиотеку dart_amqp

Добавляем библиотеку в проект командой:

flutter pub add dart_amqp

Создаем функцию, чтобы создать клиент для работы с RabbitMQ:

_initClient
Future<Exchange> _initClient() async {   try {     /// Настройки подключения     final settings = ConnectionSettings(       // Если у вас RabbitMQ запущен на на локальном компьютере       // Укажите нужный IP-адрес или hostname       host: 'localhost',       // Порт       port: 5672,       // Данные для входа в RabbitMQ       authProvider: const PlainAuthenticator('guest', 'guest'),     );      /// Создаем клиента для подключения к RabbitMQ     Client client = Client(settings: settings);     // Подключаемся     await client.connect();     // Создаем канал, по которому будем отправлять сообщения     final channel = await client.channel();     // Декларируем exchange. Если он уже создан, то повторная декларация     // должна совпадать по параметрам.     // Если не совпадает — будет ошибка.     // Если exchange не существует, то он будет создан     final exchange = await channel.exchange(       'delayed_exchange',       // Используем ExchangeType типа x-delayed-message       ExchangeType.custom('x-delayed-message'),       durable: true,       arguments: {         'x-delayed-type': 'direct',       },     );      log('Успешно подключились к RabbitMQ');     return exchange;   } on Object catch (error, stackTrace) {     log('Ошибка подключения к RabbitMQ', error: error, stackTrace: stackTrace);     rethrow;   } } 

Далее создаем простой виджет с одной кнопкой, которая будет отправлять сообщение в брокер. Это будет простое сообщение с датой отправки.

_sendMessage
  void _sendMessage() {     // Публикуем сообщение. Ставим задержку 5000 мс (5 секунд) для примера     // Через 5 секунд сообщение будет доставлено в очередь из Exchange x-delayed-message     final int delayMs = 5000;     // Обязательно передаем задержку в хедере     final headers = {'x-delay': delayMs};     // Создаем сообщение которое будем отправлять, можно отправить объект     final message =         'Привет, я отложенное сообщение, которое было отправлено в ${DateTime.now()}';     // Настройки сообщения     final properties = MessageProperties()..headers = headers;      // Ключ очереди     final routingKey = 'messages_queue';     // Отправляем     exchange.publish(       message,       routingKey,       properties: properties,     );     log('Отправили сообщение с задержкой $delayMs мс');   }

Полный исходный код:

main.dart
import 'dart:developer';  import 'package:dart_amqp/dart_amqp.dart'; import 'package:flutter/material.dart';  void main() async {   /// Инициализируем клиента   final exchange = await _initClient();    runApp(MyApp(exchange: exchange)); }  Future<Exchange> _initClient() async {   try {     /// Настройки подключения     final settings = ConnectionSettings(       // Если у вас RabbitMQ запущен на не локальном компьютере       // укажите нужный IP-адрес или hostname       host: 'localhost',       // Порт       port: 5672,       // Данные для входа в RabbitMQ       authProvider: const PlainAuthenticator('guest', 'guest'),     );      /// Создаем клиента для подключения к RabbitMQ     Client client = Client(settings: settings);     // Подключаемся     await client.connect();     // Создаем канал, по которому будем отправлять сообщения     final channel = await client.channel();     // Декларируем exchange. Если он уже создан, то повторная декларация     // должна совпадать по параметрам.     // Если не совпадает — будет ошибка.     // Если exchange не существует, то он будет создан     final exchange = await channel.exchange(       'delayed_exchange',       // Используем ExchangeType типа x-delayed-message       ExchangeType.custom('x-delayed-message'),       durable: true,       arguments: {         'x-delayed-type': 'direct',       },     );      log('Успешно подключились к RabbitMQ');     return exchange;   } on Object catch (error, stackTrace) {     log(       'Ошибка подключения к RabbitMQ',       error: error,       stackTrace: stackTrace,     );     rethrow;   } }  class MyApp extends StatelessWidget {   // Exchange для публикаций   final Exchange exchange;    const MyApp({super.key, required this.exchange});    @override   Widget build(BuildContext context) {     return MaterialApp(       home: Scaffold(         appBar: AppBar(title: const Text('Flutter Producer Пример')),         body: Center(           child: ElevatedButton(             onPressed: _sendMessage,             child: Text('Отправить'),           ),         ),       ),     );   }    void _sendMessage() {     // Публикуем сообщение. Ставим задержку 5000 мс (5 секунд) для примера     // Через 5 секунд сообщение будет доставлено в очередь из Exchange x-delayed-message     final int delayMs = 5000;     // Обязательно передаем задержку в хедере     final headers = {'x-delay': delayMs};     // Создаем сообщение которое будем отправлять, можно отправить объект     final message =         'Привет, я отложенное сообщение, которое было отправлено в ${DateTime.now()}';     // Настройки сообщения     final properties = MessageProperties()..headers = headers;      // Ключ очереди     final routingKey = 'messages_queue';     // Отправляем     exchange.publish(       message,       routingKey,       properties: properties,     );     log('Отправили сообщение с задержкой $delayMs мс');   } } 

После запуска приложения отправляем сообщение в брокер.

Producer — внешний вид

После отправки переходим в web-интерфейс RabbitMQ, далее в Exchange: delayed_exchange. На графике вы должны увидеть, что сообщение было добавлено в exchange. И если перейти в очередь, можно увидеть, что данные ушли и в очередь.

Веб интерфейс RabbitMQ — графики delayed_exchange и queues

Отлично, теперь можно написать потребителя.

Создание Consumer (потребителя) на Flutter

Как и в случае с отправителем, создаем новое Flutter-приложение, и добавляем библиотеку dart_amqp. Создаем метод для создания потребителя с тегом ‘my_consumer‘.

_initConsumer()
Future<Consumer> _initConsumer() async {   // Инициализируем Client   final client = Client(     settings: ConnectionSettings(       // Так как мы тестируем, используем локальный хост       host: 'localhost',       port: 5672,       authProvider: const PlainAuthenticator('guest', 'guest'),     ),   );   // Создаем клиент   final channel = await client.channel();    // Создаем очередь   final queue = await channel.queue(     'messages_queue',     durable: true,   );   // Создаем потребителя, с тегом my_consumer   final consumer = await queue.consume(     consumerTag: 'my_consumer',   );   return consumer; }

Далее создаем простой StatefulWidget с текстовым полем по центру, и в методе initState() создаем подписку на поток из RabbitMQ:

main.dart
import 'dart:async';  import 'package:dart_amqp/dart_amqp.dart'; import 'package:flutter/material.dart';  void main() async {   // Инициализируем Consumer   final consumer = await _initConsumer();    runApp(MyApp(consumer: consumer)); }  Future<Consumer> _initConsumer() async {   // Инициализируем Client   final client = Client(     settings: ConnectionSettings(       // Так как мы тестируем, используем локальный хост       host: 'localhost',       port: 5672,       authProvider: const PlainAuthenticator('guest', 'guest'),     ),   );   // Создаем клиент   final channel = await client.channel();    // Создаем очередь   final queue = await channel.queue(     'messages_queue',     durable: true,   );   // Создаем потребителя, с тегом my_consumer   final consumer = await queue.consume(     consumerTag: 'my_consumer',   );   return consumer; }  class MyApp extends StatefulWidget {   const MyApp({super.key, required this.consumer});   final Consumer consumer;    @override   State<MyApp> createState() => _MyAppState(); }  class _MyAppState extends State<MyApp> {   String message = 'Нет данных';   late final StreamSubscription streamSubscription;    @override   initState() {     super.initState();     // Подписываемся на получение сообщений     streamSubscription = widget.consumer.listen((AmqpMessage amqpMessage) {       message =           'Полученное сообщение: ${amqpMessage.payloadAsString} \nВремя получения: ${DateTime.now().toIso8601String()}';        setState(() {});     });   }    @override   void dispose() {     streamSubscription.cancel();     super.dispose();   }    @override   Widget build(BuildContext context) {     return MaterialApp(       home: Scaffold(         appBar: AppBar(           title: const Text(             'Flutter Consumer Пример',           ),         ),         body: Center(child: Text(message)),       ),     );   } } 

Запускаем потребителя, и если подключение прошло успешно, можно перейти в очередь и убедиться, что мы подключены как потребители.

Веб интерфейс RabbitMQ — подключенные потребители

Нажимаем на кнопку «Отправить» на отправителе.

Пример работы отправителя и получателя.

Обратите внимание, что сообщение было отправлено в 18:42:50, а потребитель получил его в 18:42:55. То есть ровно через 5 секунд, так как мы и планировали.

Вот в целом и все, чем я хотел с вами поделиться. 

Полный исходный код потребителя и отправителя можно посмотреть здесь.

А как бы вы решили такую задачу? Жду ваших комментариев.


ссылка на оригинал статьи https://habr.com/ru/articles/873790/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *