Привет, Хабр!
Если бы мне сказали, что я однажды заменю привычный Python или Scala для работы с данными на Rust, я бы, пожалуй, ухмыльнулся и продолжил привычное дело. Но времена меняются, и Rust вполне уверенно пробивает себе дорогу в мир больших данных. Сегодня я расскажу вам о трех фреймворках, которые делают Rust конкурентом в обработке данных.
И первый фреймворк – Polars.
Polars
В отличие от pandas, где нагрузка на процессор и память может быть ощутима, Polars использует фичи Rust для параллельных вычислений и оптимизированного использования ресурсов.
Главные преимущества Polars:
-
Быстрая обработка больших данных: Polars написан на Rust и использует многопоточность.
-
Поддержка многомерных данных: Polars поддерживает работу с многомерными массивами.
-
Ленивая и физическая вычислительная модель: Polars поддерживает ленивые вычисления
-
Работа с различными форматами данных: Поддержка CSV, JSON, Parquet, IPC, Avro и других форматов данных.
Примеры использования
Чтение и обработка CSV-файла
Загрузим данные из CSV-файла, отфильтруем строки и выполним групповую агрегацию:
use polars::prelude::*; use std::fs::File; fn main() -> Result<()> { // чтение CSV-файла let file = File::open("data.csv")?; let df = CsvReader::new(file) .infer_schema(None) .has_header(true) .finish()?; // фильтрация данных по условию let filtered_df = df .lazy() .filter(col("column_name").gt(lit(10))) .collect()?; // группировка и агрегация let result_df = filtered_df .lazy() .groupby([col("group_column")]) .agg([col("value_column").mean()]) .collect()?; println!("{:?}", result_df); Ok(()) }
Создание DataFrame и выполнение операций
Создадим DF и выполним операции сложения и фильтрации.
use polars::prelude::*; fn main() -> Result<()> { // создание DataFrame let df = df![ "column1" => &[1, 2, 3, 4, 5], "column2" => &[10, 20, 30, 40, 50] ]?; // добавление нового столбца с результатом сложения let df = df.lazy() .with_column((col("column1") + col("column2")).alias("sum")) .collect()?; // фильтрация строк, где сумма больше 30 let filtered_df = df.lazy() .filter(col("sum").gt(lit(30))) .collect()?; println!("{:?}", filtered_df); Ok(()) }
Ленивые вычисления и работа с JSON
use polars::prelude::*; fn main() -> Result<()> { let json_data = r#" [ {"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}, {"name": "Charlie", "age": 35} ] "#; // чтение JSON данных let df = JsonReader::new(json_data.as_bytes()).finish()?; // ленивые вычисления: фильтрация и вычисление среднего возраста let result = df.lazy() .filter(col("age").gt(lit(25))) .select([col("age").mean()]) .collect()?; println!("{:?}", result); Ok(()) }
Подробне с Polars можно ознакомиться здесь.
Arroyo
Arroyo — это распределенный движок потоковой обработки, ориентированный на stateful вычисления с поддержкой как ограниченных, так и неограниченных потоков данных.
Arroyo разработан с использованием модели Dataflow, что позволяет управлять состояниями потоков данных, делая возможными различные сложные вычисления, такие как оконные агрегации, join-операции и многое другое. Весь этот функционал так же реализован на основе Rust.
Примеры использования Arroyo
Реализуем базовую настройку для обработки потока событий и подсчет количества событий в окне времени:
use arroyo::pipeline::Pipeline; use arroyo::window::TumblingWindow; fn main() { // инициализируем конвейер обработки данных let mut pipeline = Pipeline::new(); // источник данных let source = pipeline.add_source("source_name"); // применяем оконную функцию с временем окна в 5 минут let windowed = source.window(TumblingWindow::minutes(5)) .count(); // выводим результат в консоль pipeline.add_sink(windowed, |result| println!("Количество событий: {:?}", result)); // запуск конвейера pipeline.run(); }
Теперь реализуем stateful обработку, где нужно будет отслеживать состояние между событиями, например, вычисление среднего значения на основе предыдущих данных:
use arroyo::state::StatefulOperator; use arroyo::pipeline::Pipeline; struct AverageState { sum: f64, count: u64, } impl StatefulOperator for AverageState { type Input = f64; type Output = f64; fn process(&mut self, value: Self::Input) -> Option<Self::Output> { self.sum += value; self.count += 1; Some(self.sum / self.count as f64) } } fn main() { let mut pipeline = Pipeline::new(); // инициализируем источник данных let source = pipeline.add_source("numeric_data"); // применяем stateful операцию для вычисления среднего значения let averaged = source.stateful_operator(AverageState { sum: 0.0, count: 0 }); // отправляем результат в консоль pipeline.add_sink(averaged, |avg| println!("Среднее значение: {:?}", avg)); // запуск конвейера pipeline.run(); }
Теперь рассмотрим как Arroyo может использоваться для более сложных задач, например объединение нескольких потоков данных и выполнение оконных агрегаций:
use arroyo::pipeline::Pipeline; use arroyo::window::SlidingWindow; fn main() { let mut pipeline = Pipeline::new(); // инициализация двух источников данных let source1 = pipeline.add_source("source1"); let source2 = pipeline.add_source("source2"); // оконные операции на двух потоках данных let windowed1 = source1.window(SlidingWindow::minutes(10)).sum(); let windowed2 = source2.window(SlidingWindow::minutes(10)).sum(); // join двух потоков данных по ключу let joined = windowed1.join(windowed2, |key1, key2| key1 == key2); // обработка результата pipeline.add_sink(joined, |result| println!("Join result: {:?}", result)); // запуск конвейера pipeline.run(); }
Подробнее с Arryo можно ознакомиться здесь.
Timber
Timber — это простой логгер, разработанный для приложений, работающих в условиях многопоточности. Его основное назначение — упрощение процесса логирования в параллельных задачах. Timber позволяет вести логи как в стандартный вывод, так и в указанный файл.
Основные фичи Timber:
-
Timber поддерживает уровни логирования, которые можно настроить через макросы.
-
По умолчанию Timber выводит логи в stdout, но может быть легко перенастроен на запись в файл.
Пример использования Timber в приложении, которое может переключаться между режимами отладки и релиза, изменяя уровни логирования:
#[macro_use(timber)] use timber; #[cfg(debug)] pub mod level { pub const ERR: i32 = 1; pub const DEB: i32 = 2; pub const INF: i32 = 7; } #[cfg(not(debug))] pub mod level { pub const ERR: i32 = 1; pub const DEB: i32 = 0; pub const INF: i32 = 3; } // макросы для упрощения логирования macro_rules! log_err{($($arg:tt)*) => {timber!($crate::level::ERR, "ERR", $($arg)*)}} macro_rules! log_deb{($($arg:tt)*) => {timber!($crate::level::DEB, "DEB", $($arg)*)}} macro_rules! log_inf{($($arg:tt)*) => {timber!($crate::level::INF, "INF", $($arg)*)}} fn main() { timber::init("log.txt").unwrap(); // инициализация логирования в файл log_err!("Ошибка! Этот лог будет виден всегда."); log_deb!("Отладка. Этот лог виден только в режиме отладки."); log_inf!("Информация. Этот лог будет виден и в релизе, и в отладке."); }
Можно определить константы уровней логов ERR, DEB, INF, и компилятор будет игнорировать ненужные строки в релизной сборке.
Больше практических инструментов коллеги из OTUS рассматривают в рамках практических онлайн-курсов от экспертов рынка. Подробнее ознакомиться с курсами можно в каталоге.
ссылка на оригинал статьи https://habr.com/ru/articles/839390/
Добавить комментарий