Привет, Хабр!
Сегодня мы рассмотрим, как реализовать так называемую event-driven архитектуру с использованием Rust.
Архитектура на основе событий (event-driven architecture, EDA) — это подход к созданию систем, где взаимодействие между компонентами системы происходит с помощью событий. Все это позволяет развязывать компоненты друг от друга и повышать их независимость, что, в свою очередь, увеличивает масштабируемость и гибкость системы.
Как работает EDA?
-
События: Основные данные или действия, которые происходят в системе, например, нажатие кнопки или завершение загрузки файла.
-
Производители: Компоненты, которые создают события и отправляют их в систему.
-
Потребители: Компоненты, которые подписаны на события и реагируют на них.
-
Брокеры событий: Инструменты или системы, которые управляют передачей событий между производителями и потребителями.
С развитием экосистемы Rust появились хорошие инструменты для работы с архитектурой на основе событий, такие как:
-
Tokio: асинхронная платформа для работы с сетями.
-
Actix: высокопроизводительный фреймворк для создания акторных систем.
-
async-std: асинхронный стандарт для работы с Rust.
Установка и настройка среды
-
Установим Rust:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -
Установим необходимые библиотеки:
-
tokio -
async-std -
actix
-
Простая система на основе событий с использованием Tokio
Создадим простое приложение, которое будет отправлять и обрабатывать события с помощью Tokio.
cargo new event_driven_example cd event_driven_example
Добавим зависимости в Cargo.toml:
[dependencies] tokio = { version = "1.0", features = ["full"] }
А теперь напишем сам код:
use tokio::sync::mpsc; use tokio::task; #[tokio::main] async fn main() { // Создаем канал для передачи сообщений let (tx, mut rx) = mpsc::channel(32); // Запускаем асинхронную задачу для обработки событий task::spawn(async move { while let Some(event) = rx.recv().await { println!("Обработано событие: {}", event); } }); // Генерация событий for i in 1..=10 { tx.send(format!("Событие {}", i)).await.unwrap(); } }
Здесь мы:
-
Создали канал для передачи сообщений между производителем и потребителем.
-
Запустили асинхронную задачу для обработки событий с помощью
tokio::task::spawn. -
Генерируем события и отправляем их в канал.
Использование Actix для акторной модели
Теперь рассмотрим использование Actix, чтобы создать более сложную систему на основе акторов.
Добавим зависимости в Cargo.toml:
[dependencies] actix = "0.12" actix-web = "4.0.0-beta.8" serde = { version = "1.0", features = ["derive"] }
Создадим акторы:
use actix::prelude::*; use serde::{Serialize, Deserialize}; #[derive(Message, Serialize, Deserialize)] #[rtype(result = "()")] struct Event { id: u32, message: String, } struct EventProducer; impl Actor for EventProducer { type Context = Context<Self>; } impl Handler<Event> for EventProducer { type Result = (); fn handle(&mut self, event: Event, _: &mut Context<Self>) { println!("Произведено событие: {} - {}", event.id, event.message); } } struct EventConsumer; impl Actor for EventConsumer { type Context = Context<Self>; } impl Handler<Event> for EventConsumer { type Result = (); fn handle(&mut self, event: Event, _: &mut Context<Self>) { println!("Получено событие: {} - {}", event.id, event.message); } } #[actix::main] async fn main() { let producer = EventProducer.start(); let consumer = EventConsumer.start(); for i in 1..=10 { let event = Event { id: i, message: format!("Сообщение {}", i), }; producer.do_send(event.clone()); consumer.do_send(event); } }
Объяснение:
-
Создаем акторов
EventProducerиEventConsumer, которые обрабатывают события. -
Определяем структуру
Eventи реализуем для нее сообщениеMessage. -
Запускаем акторов и передаем им события с помощью
do_send.
Реализация брокера событий
Брокеры событий могут быть хороши для управления маршрутизацией событий между различными компонентами системы. Можно создать простой брокер с использованием библиотеки tokio.
Добавлим зависимости в Cargo.toml:
[dependencies] tokio = { version = "1.0", features = ["full"] }
Создадим брокер:
use tokio::sync::mpsc; use tokio::task; struct EventBroker { sender: mpsc::Sender<String>, receiver: mpsc::Receiver<String>, } impl EventBroker { fn new(buffer_size: usize) -> Self { let (sender, receiver) = mpsc::channel(buffer_size); EventBroker { sender, receiver } } async fn start(&mut self) { while let Some(event) = self.receiver.recv().await { println!("Брокер обработал событие: {}", event); } } async fn send_event(&self, event: String) { self.sender.send(event).await.unwrap(); } } #[tokio::main] async fn main() { let mut broker = EventBroker::new(32); task::spawn(async move { broker.start().await; }); for i in 1..=10 { broker.send_event(format!("Событие {}", i)).await; } }
Здесь мы:
-
Создали
EventBrokerс каналом для передачи событий. -
Запутили брокер в асинхронной задаче и обрабатываем входящие события.
-
Отправляем события в брокер с помощью
send_event.
Оптимизация и масштабируемость
Kafka и NATS
Apache Kafka — распределенная система для обработки потоков данных в реальном времени.
NATS — высокопроизводительная система обмена сообщениями с поддержкой pub/sub и request/reply.
[dependencies] tokio = { version = "1.0", features = ["full"] } rdkafka = "0.29" nats = "0.19"
// пример подключения к Kafka use rdkafka::config::ClientConfig; use rdkafka::producer::{FutureProducer, FutureRecord}; async fn produce_kafka_event() { let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", "localhost:9092") .create() .unwrap(); producer.send( FutureRecord::to("my-topic") .payload("Это сообщение для Kafka") .key("ключ"), 0, ).await.unwrap(); }
// пример подключения к NATS use nats::asynk::Connection; async fn publish_nats_event() { let nc = Connection::connect("localhost:4222").await.unwrap(); nc.publish("events", "Это сообщение для NATS").await.unwrap(); }
Примеры
Допустим, нужно обрабатывать большое количество событий в реальном времени и сохранять результаты в базе данных.
Для этого мы будем использовать Kafka для приема и обработки событий и Rust для обработки данных и записи результатов в базу данных:
use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::Message; use tokio_postgres::{NoTls, Client}; async fn process_events() { let consumer: StreamConsumer = ClientConfig::new() .set("bootstrap.servers", "localhost:9092") .set("group.id", "my-group") .create() .unwrap(); let (client, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await.unwrap(); tokio::spawn(async move { if let Err(e) = connection.await { eprintln!("Ошибка соединения: {}", e); } }); consumer.subscribe(&["my-topic"]).unwrap(); while let Some(message) = consumer.recv().await.unwrap() { let payload = match message.payload_view::<str>() { Some(Ok(text)) => text, Some(Err(e)) => { eprintln!("Ошибка декодирования сообщения: {:?}", e); continue; } None => continue, }; println!("Получено сообщение: {}", payload); client.execute("INSERT INTO events (data) VALUES ($1)", &[&payload]).await.unwrap(); } }
Подробнее с применяемыми библиотеками можно ознакомиться по гиперссылкам:
-
Tokio — асинхронная платформа для работы с сетями.
-
Actix — высокопроизводительный фреймворк для создания акторных систем.
-
async-std — асинхронный стандарт для работы с Rust.
Больше практических навыков по архитектуре приложений вы можете получить в рамках практических онлайн-курсов от экспертов отрасли.
ссылка на оригинал статьи https://habr.com/ru/articles/834340/
Добавить комментарий