Фреймворки для работа с данными на Rust. Часть 1

от автора

Привет, Хабр!

Если бы мне сказали, что я однажды заменю привычный Python или Scala для работы с данными на Rust, я бы, пожалуй, ухмыльнулся и продолжил привычное дело. Но времена меняются, и Rust вполне уверенно пробивает себе дорогу в мир больших данных. Сегодня я расскажу вам о трех фреймворках, которые делают Rust конкурентом в обработке данных.

И первый фреймворк – Polars.

Polars

В отличие от pandas, где нагрузка на процессор и память может быть ощутима, Polars использует фичи Rust для параллельных вычислений и оптимизированного использования ресурсов.

Главные преимущества Polars:

  • Быстрая обработка больших данных: Polars написан на Rust и использует многопоточность.

  • Поддержка многомерных данных: Polars поддерживает работу с многомерными массивами.

  • Ленивая и физическая вычислительная модель: Polars поддерживает ленивые вычисления

  • Работа с различными форматами данных: Поддержка CSV, JSON, Parquet, IPC, Avro и других форматов данных.

Примеры использования

Чтение и обработка CSV-файла

Загрузим данные из CSV-файла, отфильтруем строки и выполним групповую агрегацию:

use polars::prelude::*; use std::fs::File;  fn main() -> Result<()> {     // чтение CSV-файла     let file = File::open("data.csv")?;     let df = CsvReader::new(file)         .infer_schema(None)         .has_header(true)         .finish()?;      // фильтрация данных по условию     let filtered_df = df         .lazy()         .filter(col("column_name").gt(lit(10)))         .collect()?;      // группировка и агрегация     let result_df = filtered_df         .lazy()         .groupby([col("group_column")])         .agg([col("value_column").mean()])         .collect()?;      println!("{:?}", result_df);      Ok(()) }

Создание DataFrame и выполнение операций

Создадим DF и выполним операции сложения и фильтрации.

use polars::prelude::*;  fn main() -> Result<()> {     // создание DataFrame     let df = df![         "column1" => &[1, 2, 3, 4, 5],         "column2" => &[10, 20, 30, 40, 50]     ]?;      // добавление нового столбца с результатом сложения     let df = df.lazy()         .with_column((col("column1") + col("column2")).alias("sum"))         .collect()?;      // фильтрация строк, где сумма больше 30     let filtered_df = df.lazy()         .filter(col("sum").gt(lit(30)))         .collect()?;      println!("{:?}", filtered_df);      Ok(()) }

Ленивые вычисления и работа с JSON

use polars::prelude::*;  fn main() -> Result<()> {     let json_data = r#"         [             {"name": "Alice", "age": 25},             {"name": "Bob", "age": 30},             {"name": "Charlie", "age": 35}         ]     "#;      // чтение JSON данных     let df = JsonReader::new(json_data.as_bytes()).finish()?;      // ленивые вычисления: фильтрация и вычисление среднего возраста     let result = df.lazy()         .filter(col("age").gt(lit(25)))         .select([col("age").mean()])         .collect()?;      println!("{:?}", result);      Ok(()) }

Подробне с Polars можно ознакомиться здесь.

Arroyo

Arroyo — это распределенный движок потоковой обработки, ориентированный на stateful вычисления с поддержкой как ограниченных, так и неограниченных потоков данных.

Arroyo разработан с использованием модели Dataflow, что позволяет управлять состояниями потоков данных, делая возможными различные сложные вычисления, такие как оконные агрегации, join-операции и многое другое. Весь этот функционал так же реализован на основе Rust.

Примеры использования Arroyo

Реализуем базовую настройку для обработки потока событий и подсчет количества событий в окне времени:

use arroyo::pipeline::Pipeline; use arroyo::window::TumblingWindow;  fn main() {     // инициализируем конвейер обработки данных     let mut pipeline = Pipeline::new();      // источник данных     let source = pipeline.add_source("source_name");      // применяем оконную функцию с временем окна в 5 минут     let windowed = source.window(TumblingWindow::minutes(5))                          .count();      // выводим результат в консоль     pipeline.add_sink(windowed, |result| println!("Количество событий: {:?}", result));      // запуск конвейера     pipeline.run(); }

Теперь реализуем stateful обработку, где нужно будет отслеживать состояние между событиями, например, вычисление среднего значения на основе предыдущих данных:

use arroyo::state::StatefulOperator; use arroyo::pipeline::Pipeline;  struct AverageState {     sum: f64,     count: u64, }  impl StatefulOperator for AverageState {     type Input = f64;     type Output = f64;      fn process(&mut self, value: Self::Input) -> Option<Self::Output> {         self.sum += value;         self.count += 1;         Some(self.sum / self.count as f64)     } }  fn main() {     let mut pipeline = Pipeline::new();      // инициализируем источник данных     let source = pipeline.add_source("numeric_data");      // применяем stateful операцию для вычисления среднего значения     let averaged = source.stateful_operator(AverageState { sum: 0.0, count: 0 });      // отправляем результат в консоль     pipeline.add_sink(averaged, |avg| println!("Среднее значение: {:?}", avg));      // запуск конвейера     pipeline.run(); }

Теперь рассмотрим как Arroyo может использоваться для более сложных задач, например объединение нескольких потоков данных и выполнение оконных агрегаций:

use arroyo::pipeline::Pipeline; use arroyo::window::SlidingWindow;  fn main() {     let mut pipeline = Pipeline::new();      // инициализация двух источников данных     let source1 = pipeline.add_source("source1");     let source2 = pipeline.add_source("source2");      // оконные операции на двух потоках данных     let windowed1 = source1.window(SlidingWindow::minutes(10)).sum();     let windowed2 = source2.window(SlidingWindow::minutes(10)).sum();      // join двух потоков данных по ключу     let joined = windowed1.join(windowed2, |key1, key2| key1 == key2);      // обработка результата     pipeline.add_sink(joined, |result| println!("Join result: {:?}", result));      // запуск конвейера     pipeline.run(); }

Подробнее с Arryo можно ознакомиться здесь.

Timber

Timber — это простой логгер, разработанный для приложений, работающих в условиях многопоточности. Его основное назначение — упрощение процесса логирования в параллельных задачах. Timber позволяет вести логи как в стандартный вывод, так и в указанный файл.

Основные фичи Timber:

  1. Timber поддерживает уровни логирования, которые можно настроить через макросы.

  2. По умолчанию Timber выводит логи в stdout, но может быть легко перенастроен на запись в файл.

Пример использования Timber в приложении, которое может переключаться между режимами отладки и релиза, изменяя уровни логирования:

#[macro_use(timber)] use timber;  #[cfg(debug)] pub mod level {     pub const ERR: i32 = 1;     pub const DEB: i32 = 2;     pub const INF: i32 = 7; }  #[cfg(not(debug))] pub mod level {     pub const ERR: i32 = 1;     pub const DEB: i32 = 0;     pub const INF: i32 = 3; }  // макросы для упрощения логирования macro_rules! log_err{($($arg:tt)*) => {timber!($crate::level::ERR, "ERR", $($arg)*)}} macro_rules! log_deb{($($arg:tt)*) => {timber!($crate::level::DEB, "DEB", $($arg)*)}} macro_rules! log_inf{($($arg:tt)*) => {timber!($crate::level::INF, "INF", $($arg)*)}}  fn main() {     timber::init("log.txt").unwrap(); // инициализация логирования в файл      log_err!("Ошибка! Этот лог будет виден всегда.");     log_deb!("Отладка. Этот лог виден только в режиме отладки.");     log_inf!("Информация. Этот лог будет виден и в релизе, и в отладке."); }

Можно определить константы уровней логов ERR, DEB, INF, и компилятор будет игнорировать ненужные строки в релизной сборке.

Больше практических инструментов коллеги из OTUS рассматривают в рамках практических онлайн-курсов от экспертов рынка. Подробнее ознакомиться с курсами можно в каталоге.


ссылка на оригинал статьи https://habr.com/ru/articles/839390/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *