Архитектура на основе событий в Rust

от автора

Привет, Хабр!

Сегодня мы рассмотрим, как реализовать так называемую event-driven архитектуру с использованием Rust.

Архитектура на основе событий (event-driven architecture, EDA) — это подход к созданию систем, где взаимодействие между компонентами системы происходит с помощью событий. Все это позволяет развязывать компоненты друг от друга и повышать их независимость, что, в свою очередь, увеличивает масштабируемость и гибкость системы.

Как работает EDA?

  1. События: Основные данные или действия, которые происходят в системе, например, нажатие кнопки или завершение загрузки файла.

  2. Производители: Компоненты, которые создают события и отправляют их в систему.

  3. Потребители: Компоненты, которые подписаны на события и реагируют на них.

  4. Брокеры событий: Инструменты или системы, которые управляют передачей событий между производителями и потребителями.

С развитием экосистемы Rust появились хорошие инструменты для работы с архитектурой на основе событий, такие как:

  • Tokio: асинхронная платформа для работы с сетями.

  • Actix: высокопроизводительный фреймворк для создания акторных систем.

  • async-std: асинхронный стандарт для работы с Rust.

Установка и настройка среды

  1. Установим Rust:

    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
  2. Установим необходимые библиотеки:

    • tokio

    • async-std

    • actix

Простая система на основе событий с использованием Tokio

Создадим простое приложение, которое будет отправлять и обрабатывать события с помощью Tokio.

cargo new event_driven_example cd event_driven_example

Добавим зависимости в Cargo.toml:

[dependencies] tokio = { version = "1.0", features = ["full"] }

А теперь напишем сам код:

use tokio::sync::mpsc; use tokio::task;  #[tokio::main] async fn main() {     // Создаем канал для передачи сообщений     let (tx, mut rx) = mpsc::channel(32);      // Запускаем асинхронную задачу для обработки событий     task::spawn(async move {         while let Some(event) = rx.recv().await {             println!("Обработано событие: {}", event);         }     });      // Генерация событий     for i in 1..=10 {         tx.send(format!("Событие {}", i)).await.unwrap();     } }

Здесь мы:

  1. Создали канал для передачи сообщений между производителем и потребителем.

  2. Запустили асинхронную задачу для обработки событий с помощью tokio::task::spawn.

  3. Генерируем события и отправляем их в канал.

Использование Actix для акторной модели

Теперь рассмотрим использование Actix, чтобы создать более сложную систему на основе акторов.

Добавим зависимости в Cargo.toml:

[dependencies] actix = "0.12" actix-web = "4.0.0-beta.8" serde = { version = "1.0", features = ["derive"] }

Создадим акторы:

use actix::prelude::*; use serde::{Serialize, Deserialize};  #[derive(Message, Serialize, Deserialize)] #[rtype(result = "()")] struct Event {     id: u32,     message: String, }  struct EventProducer;  impl Actor for EventProducer {     type Context = Context<Self>; }  impl Handler<Event> for EventProducer {     type Result = ();      fn handle(&mut self, event: Event, _: &mut Context<Self>) {         println!("Произведено событие: {} - {}", event.id, event.message);     } }  struct EventConsumer;  impl Actor for EventConsumer {     type Context = Context<Self>; }  impl Handler<Event> for EventConsumer {     type Result = ();      fn handle(&mut self, event: Event, _: &mut Context<Self>) {         println!("Получено событие: {} - {}", event.id, event.message);     } }  #[actix::main] async fn main() {     let producer = EventProducer.start();     let consumer = EventConsumer.start();      for i in 1..=10 {         let event = Event {             id: i,             message: format!("Сообщение {}", i),         };          producer.do_send(event.clone());         consumer.do_send(event);     } }

Объяснение:

  1. Создаем акторов EventProducer и EventConsumer, которые обрабатывают события.

  2. Определяем структуру Event и реализуем для нее сообщение Message.

  3. Запускаем акторов и передаем им события с помощью do_send.

Реализация брокера событий

Брокеры событий могут быть хороши для управления маршрутизацией событий между различными компонентами системы. Можно создать простой брокер с использованием библиотеки tokio.

Добавлим зависимости в Cargo.toml:

[dependencies] tokio = { version = "1.0", features = ["full"] }

Создадим брокер:

use tokio::sync::mpsc; use tokio::task;  struct EventBroker {     sender: mpsc::Sender<String>,     receiver: mpsc::Receiver<String>, }  impl EventBroker {     fn new(buffer_size: usize) -> Self {         let (sender, receiver) = mpsc::channel(buffer_size);         EventBroker { sender, receiver }     }      async fn start(&mut self) {         while let Some(event) = self.receiver.recv().await {             println!("Брокер обработал событие: {}", event);         }     }      async fn send_event(&self, event: String) {         self.sender.send(event).await.unwrap();     } }  #[tokio::main] async fn main() {     let mut broker = EventBroker::new(32);      task::spawn(async move {         broker.start().await;     });      for i in 1..=10 {         broker.send_event(format!("Событие {}", i)).await;     } }

Здесь мы:

  1. Создали EventBroker с каналом для передачи событий.

  2. Запутили брокер в асинхронной задаче и обрабатываем входящие события.

  3. Отправляем события в брокер с помощью send_event.

Оптимизация и масштабируемость

Kafka и NATS

Apache Kafka — распределенная система для обработки потоков данных в реальном времени.

NATS — высокопроизводительная система обмена сообщениями с поддержкой pub/sub и request/reply.

[dependencies] tokio = { version = "1.0", features = ["full"] } rdkafka = "0.29" nats = "0.19"
// пример подключения к Kafka use rdkafka::config::ClientConfig; use rdkafka::producer::{FutureProducer, FutureRecord};  async fn produce_kafka_event() {     let producer: FutureProducer = ClientConfig::new()         .set("bootstrap.servers", "localhost:9092")         .create()         .unwrap();      producer.send(         FutureRecord::to("my-topic")             .payload("Это сообщение для Kafka")             .key("ключ"),         0,     ).await.unwrap(); }
// пример подключения к NATS use nats::asynk::Connection;  async fn publish_nats_event() {     let nc = Connection::connect("localhost:4222").await.unwrap();      nc.publish("events", "Это сообщение для NATS").await.unwrap(); }

Примеры

Допустим, нужно обрабатывать большое количество событий в реальном времени и сохранять результаты в базе данных.

Для этого мы будем использовать Kafka для приема и обработки событий и Rust для обработки данных и записи результатов в базу данных:

use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::Message; use tokio_postgres::{NoTls, Client};  async fn process_events() {     let consumer: StreamConsumer = ClientConfig::new()         .set("bootstrap.servers", "localhost:9092")         .set("group.id", "my-group")         .create()         .unwrap();      let (client, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await.unwrap();      tokio::spawn(async move {         if let Err(e) = connection.await {             eprintln!("Ошибка соединения: {}", e);         }     });      consumer.subscribe(&["my-topic"]).unwrap();      while let Some(message) = consumer.recv().await.unwrap() {         let payload = match message.payload_view::<str>() {             Some(Ok(text)) => text,             Some(Err(e)) => {                 eprintln!("Ошибка декодирования сообщения: {:?}", e);                 continue;             }             None => continue,         };          println!("Получено сообщение: {}", payload);          client.execute("INSERT INTO events (data) VALUES ($1)", &[&payload]).await.unwrap();     } }

Подробнее с применяемыми библиотеками можно ознакомиться по гиперссылкам:

  1. Tokio — асинхронная платформа для работы с сетями.

  2. Actix — высокопроизводительный фреймворк для создания акторных систем.

  3. async-std — асинхронный стандарт для работы с Rust.


Больше практических навыков по архитектуре приложений вы можете получить в рамках практических онлайн-курсов от экспертов отрасли.


ссылка на оригинал статьи https://habr.com/ru/articles/834340/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *