Привет, Хабр! Я Юрий Петров, руководитель отдела мобильной разработки в Friflex и автор телеграм-канала «Мобильный разработчик».
В этой статье хотел бы поделиться с вами опытом работы с брокером сообщений RabbitMQ из Dart-кода.
Разберу вопросы:
-
Как установить и запустить контейнер с RabbitMQ
-
Как настроить RabbitMQ.
-
Как создать Producer (отправителя) на Flutter.
-
Как создать Consumer (потребителя) на Flutter.
Давайте представим, что вы пишете маленький сервис на Dart, задача которого — отправить миллион сообщений. Не важно, каким способом — с помощью пушей или любого другого сервиса доставки. Но есть условие, что сообщения должны отправляться не все сразу, а по некоторому алгоритму. Например, ровно в 12 дня или, если у пользователя ночь, то только в дневное время. На самом деле не важен алгоритм, главное — понять, что у нас есть некая задержка отправки сообщений.
Есть простой способ это организовать. Например, мы будем использовать для пула сообщений задержку:
await Future.delayed(Duration(hours: 2));
По окончании задержки система отправит сообщения через два часа после получения. И в целом это будет работать, но это не самый лучший способ. Просто представьте, что таких сообщений может быть миллион, или, например, задержку отправки необходимо сделать на 10 дней, пока пользователь в отпуске. Понятно, что это не самый эффективный вариант.
Для решения таких проблем программисты придумали планировщики задач. Их называют брокеры сообщений, самый подходящий для нас — это RabbitMQ, так как у него есть нативный плагин Delayed Message Exchange, который отлично подходит для выполнения задач через установленное время.
Как работает сам брокер и что это вообще такое, я рассказывать не буду, так как это отдельная тема. Но вы можете почитать, например, здесь.
Для решения этой задачи, будем использовать следующие понятия:
Producer (Отправитель): приложение, которое будет отправлять сообщение в брокер. Сообщение должно быть доставлено потребителю не сразу, а через пять секунд.
Broker: сервис, который получает сообщение и хранит его в очереди до момента, когда нужно его отправить. Используем RabbitMQ.
Consumer (Потребитель): приложение, которое слушает очередь в брокере, получает готовые к доставке сообщения и отображает их.
Установка и запуск контейнера с RabbitMQ
И первое, с чего мы начнем, это запустим сервис RabbitMQ как докер-контейнер. Но сначала вы должны у себя установить Docker Desktop, перейти на сайт и выбрать необходимую сборку. Делается это очень просто, я думаю, труда у вас это не составит.
После того как установили docker в систему, проверяем в терминале командой:
docker --version
Вывод в консоль должен быть примерно такой:
Docker version 27.4.0, build bde2b89.
После того как разобрались с докером, нам необходимо запустить контейнер с RabbitMQ. И здесь у вас есть выбор. Вы можете использовать оригинальный контейнер, запустив команду в терминале, которая автоматически скачает и развернет докер-контейнер. И далее самим установить плагин Delayed Message Exchange.
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
И можно использовать готовый образ с уже установленным плагином.
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:4.0.2-management
Обе эти команды запускают контейнер с RabbitMQ, используя нужный докер-образ с параметрами:
-d
: Флаг -d
(или --detach
) указывает Docker запускать контейнер в фоновом режиме, не блокируя терминал. Контейнер работает как отдельный процесс.--name my-rabbit
: Этот параметр задает имя для контейнера.
-p 5672:5672 -p 15672:15672
: Пробрасывает порты между хост-машиной и контейнером.
После успешного выполнения команды проверьте, запущен ли у вас контейнер командой:
docker ps
В терминале вы должны увидеть запущенный контейнер в колонке NAMES.
Настройка RabbitMQ
Теперь нам необходимо настроить RabbitMQ. Для это переходим на веб-интерфейс — вводим адрес в браузере: http://localhost:15672/
Вы должны увидеть окно для ввода логина и пароля.
Веб интерфейс RabbitMQ — ввод логина и пароля
Вводим данные по умолчанию: username — guest и password — guest.
Попадаем на главное окно:
Веб интерфейс RabbitMQ — главное окно
Далее переходим во вкладку Exchanges и добавляем новый обменник.
Заполняем поля:
-
Name: delayed_exchange.
-
Type: выбираем x-delayed-message. Если в списке нет x-delayed-message, значит, плагин не установлен — проверьте, что все установлено и включено.
-
Arguments: добавляем ключ:
-
key: x-delayed-type
-
value: direct.
-
Веб интерфейс RabbitMQ — настройки Exchanges
Нажимаем на кнопку Add exchange. Убеждаемся, что в списке появился delayed_exchange.
Веб интерфейс RabbitMQ — проверка
Осталось настроить очередь для сообщений. Для этого переходим на вкладку Queues.
Добавляем новую очередь. Заполняем поля:
-
Name: messages_queue.
-
Durability: Durable.
Нажимаем Add queue.
Веб интерфейс RabbitMQ — настройки Queues
Отлично, у нас есть настроенный exchange и queue. Осталось их соединить.
Для этого переходим в созданную очередь.
Веб интерфейс RabbitMQ — переход на Queues
Далее на странице очереди messages_queue находим секцию Bindings.
Заполняем поля:
-
From exchange: delayed_exchange
-
Routing key: messages_queue
Нажимаем кнопку Bind.
Веб интерфейс RabbitMQ — пример связки
Таким образом, когда мы публикуем в delayed_exchange, сообщение придет в messages_queue, после того как истечет задержка x-delay.
Если вам не нравится работать в веб-интерфейсе, или вы хотели бы этот процесс автоматизировать, можно это все сделать через терминал командами:
Подключаемся к контейнеру my-rabbit:
docker exec -it my-rabbit bash
Объявляем exchange внутри контейнера:
rabbitmqadmin declare exchange \ name=delayed_exchange \ type=x-delayed-message \ arguments='{"x-delayed-type":"direct"}'
Создаем очередь:
rabbitmqadmin declare queue name=messages_queue durable=true
Создаем связку:
rabbitmqadmin declare binding source=delayed_exchange destination=messages_queue routing_key="messages_queue"
Все готово, теперь нам необходимо реализовать Producer на Flutter.
Создание Producer (отправителя) на Flutter
Для реализации отправителя, создадим простой проект на Flutter с одной кнопкой, которая будет отправлять сообщение с нужной нам задержкой. Для работы с брокером будем использовать библиотеку dart_amqp
Добавляем библиотеку в проект командой:
flutter pub add dart_amqp
Создаем функцию, чтобы создать клиент для работы с RabbitMQ:
_initClient
Future<Exchange> _initClient() async { try { /// Настройки подключения final settings = ConnectionSettings( // Если у вас RabbitMQ запущен на на локальном компьютере // Укажите нужный IP-адрес или hostname host: 'localhost', // Порт port: 5672, // Данные для входа в RabbitMQ authProvider: const PlainAuthenticator('guest', 'guest'), ); /// Создаем клиента для подключения к RabbitMQ Client client = Client(settings: settings); // Подключаемся await client.connect(); // Создаем канал, по которому будем отправлять сообщения final channel = await client.channel(); // Декларируем exchange. Если он уже создан, то повторная декларация // должна совпадать по параметрам. // Если не совпадает — будет ошибка. // Если exchange не существует, то он будет создан final exchange = await channel.exchange( 'delayed_exchange', // Используем ExchangeType типа x-delayed-message ExchangeType.custom('x-delayed-message'), durable: true, arguments: { 'x-delayed-type': 'direct', }, ); log('Успешно подключились к RabbitMQ'); return exchange; } on Object catch (error, stackTrace) { log('Ошибка подключения к RabbitMQ', error: error, stackTrace: stackTrace); rethrow; } }
Далее создаем простой виджет с одной кнопкой, которая будет отправлять сообщение в брокер. Это будет простое сообщение с датой отправки.
_sendMessage
void _sendMessage() { // Публикуем сообщение. Ставим задержку 5000 мс (5 секунд) для примера // Через 5 секунд сообщение будет доставлено в очередь из Exchange x-delayed-message final int delayMs = 5000; // Обязательно передаем задержку в хедере final headers = {'x-delay': delayMs}; // Создаем сообщение которое будем отправлять, можно отправить объект final message = 'Привет, я отложенное сообщение, которое было отправлено в ${DateTime.now()}'; // Настройки сообщения final properties = MessageProperties()..headers = headers; // Ключ очереди final routingKey = 'messages_queue'; // Отправляем exchange.publish( message, routingKey, properties: properties, ); log('Отправили сообщение с задержкой $delayMs мс'); }
Полный исходный код:
main.dart
import 'dart:developer'; import 'package:dart_amqp/dart_amqp.dart'; import 'package:flutter/material.dart'; void main() async { /// Инициализируем клиента final exchange = await _initClient(); runApp(MyApp(exchange: exchange)); } Future<Exchange> _initClient() async { try { /// Настройки подключения final settings = ConnectionSettings( // Если у вас RabbitMQ запущен на не локальном компьютере // укажите нужный IP-адрес или hostname host: 'localhost', // Порт port: 5672, // Данные для входа в RabbitMQ authProvider: const PlainAuthenticator('guest', 'guest'), ); /// Создаем клиента для подключения к RabbitMQ Client client = Client(settings: settings); // Подключаемся await client.connect(); // Создаем канал, по которому будем отправлять сообщения final channel = await client.channel(); // Декларируем exchange. Если он уже создан, то повторная декларация // должна совпадать по параметрам. // Если не совпадает — будет ошибка. // Если exchange не существует, то он будет создан final exchange = await channel.exchange( 'delayed_exchange', // Используем ExchangeType типа x-delayed-message ExchangeType.custom('x-delayed-message'), durable: true, arguments: { 'x-delayed-type': 'direct', }, ); log('Успешно подключились к RabbitMQ'); return exchange; } on Object catch (error, stackTrace) { log( 'Ошибка подключения к RabbitMQ', error: error, stackTrace: stackTrace, ); rethrow; } } class MyApp extends StatelessWidget { // Exchange для публикаций final Exchange exchange; const MyApp({super.key, required this.exchange}); @override Widget build(BuildContext context) { return MaterialApp( home: Scaffold( appBar: AppBar(title: const Text('Flutter Producer Пример')), body: Center( child: ElevatedButton( onPressed: _sendMessage, child: Text('Отправить'), ), ), ), ); } void _sendMessage() { // Публикуем сообщение. Ставим задержку 5000 мс (5 секунд) для примера // Через 5 секунд сообщение будет доставлено в очередь из Exchange x-delayed-message final int delayMs = 5000; // Обязательно передаем задержку в хедере final headers = {'x-delay': delayMs}; // Создаем сообщение которое будем отправлять, можно отправить объект final message = 'Привет, я отложенное сообщение, которое было отправлено в ${DateTime.now()}'; // Настройки сообщения final properties = MessageProperties()..headers = headers; // Ключ очереди final routingKey = 'messages_queue'; // Отправляем exchange.publish( message, routingKey, properties: properties, ); log('Отправили сообщение с задержкой $delayMs мс'); } }
После запуска приложения отправляем сообщение в брокер.
Producer — внешний вид
После отправки переходим в web-интерфейс RabbitMQ, далее в Exchange: delayed_exchange. На графике вы должны увидеть, что сообщение было добавлено в exchange. И если перейти в очередь, можно увидеть, что данные ушли и в очередь.
Веб интерфейс RabbitMQ — графики delayed_exchange и queues
Отлично, теперь можно написать потребителя.
Создание Consumer (потребителя) на Flutter
Как и в случае с отправителем, создаем новое Flutter-приложение, и добавляем библиотеку dart_amqp. Создаем метод для создания потребителя с тегом ‘my_consumer
‘.
_initConsumer()
Future<Consumer> _initConsumer() async { // Инициализируем Client final client = Client( settings: ConnectionSettings( // Так как мы тестируем, используем локальный хост host: 'localhost', port: 5672, authProvider: const PlainAuthenticator('guest', 'guest'), ), ); // Создаем клиент final channel = await client.channel(); // Создаем очередь final queue = await channel.queue( 'messages_queue', durable: true, ); // Создаем потребителя, с тегом my_consumer final consumer = await queue.consume( consumerTag: 'my_consumer', ); return consumer; }
Далее создаем простой StatefulWidget
с текстовым полем по центру, и в методе initState()
создаем подписку на поток из RabbitMQ:
main.dart
import 'dart:async'; import 'package:dart_amqp/dart_amqp.dart'; import 'package:flutter/material.dart'; void main() async { // Инициализируем Consumer final consumer = await _initConsumer(); runApp(MyApp(consumer: consumer)); } Future<Consumer> _initConsumer() async { // Инициализируем Client final client = Client( settings: ConnectionSettings( // Так как мы тестируем, используем локальный хост host: 'localhost', port: 5672, authProvider: const PlainAuthenticator('guest', 'guest'), ), ); // Создаем клиент final channel = await client.channel(); // Создаем очередь final queue = await channel.queue( 'messages_queue', durable: true, ); // Создаем потребителя, с тегом my_consumer final consumer = await queue.consume( consumerTag: 'my_consumer', ); return consumer; } class MyApp extends StatefulWidget { const MyApp({super.key, required this.consumer}); final Consumer consumer; @override State<MyApp> createState() => _MyAppState(); } class _MyAppState extends State<MyApp> { String message = 'Нет данных'; late final StreamSubscription streamSubscription; @override initState() { super.initState(); // Подписываемся на получение сообщений streamSubscription = widget.consumer.listen((AmqpMessage amqpMessage) { message = 'Полученное сообщение: ${amqpMessage.payloadAsString} \nВремя получения: ${DateTime.now().toIso8601String()}'; setState(() {}); }); } @override void dispose() { streamSubscription.cancel(); super.dispose(); } @override Widget build(BuildContext context) { return MaterialApp( home: Scaffold( appBar: AppBar( title: const Text( 'Flutter Consumer Пример', ), ), body: Center(child: Text(message)), ), ); } }
Запускаем потребителя, и если подключение прошло успешно, можно перейти в очередь и убедиться, что мы подключены как потребители.
Веб интерфейс RabbitMQ — подключенные потребители
Нажимаем на кнопку «Отправить» на отправителе.
Пример работы отправителя и получателя.
Обратите внимание, что сообщение было отправлено в 18:42:50, а потребитель получил его в 18:42:55. То есть ровно через 5 секунд, так как мы и планировали.
Вот в целом и все, чем я хотел с вами поделиться.
Полный исходный код потребителя и отправителя можно посмотреть здесь.
А как бы вы решили такую задачу? Жду ваших комментариев.
ссылка на оригинал статьи https://habr.com/ru/articles/873790/
Добавить комментарий