Основы обработки асинхронных событий с помощью Rx в C#

от автора

Привет, Хабр!

Reactive Extensions, известные также как Rx.NET, представляют собой библиотеку для .NET, предназначенную для композиции асинхронных и событийно-ориентированных программ с помощью наблюдаемых последовательностей и операторов запросов в стиле LINQ.

Основная фича Rx заключается в управлении асинхронными данными так, как если бы они были синхронными коллекциями.

В этой статье мы рассмотрми, как работать с Rx в C#.

Немного теории

Observables – это основа Rx, представляющая потоки данных, которые можно наблюдать. Observables могут генерировать данные с течением времени, будь то из массивов, событий или тех же асинхронных запросов.

Observers – объекты, которые подписываются на Observables и реагируют на данные, поступающие из них. Observer реализует три метода, которые вызываются соответственно при получении данных, ошибки или завершении потока. С помощью метода Subscribe, Observer подключается к Observable и начинает получать уведомления.

Операторы выполняю основную роль манипуляции и управлении потоками данных. Они позволяют трансформировать, фильтровать, комбинировать и даже создавать новые потоки на основе существующих.

Синтаксис Rx

Для создания Observable последовательностей в Rx используется метод Observable.Create. Этот метод позволяет определить логику создания потока данных, которая будет исполняться при подписке на Observable:

IObservable<int> observable = Observable.Create(observer => {     observer.OnNext(1);     observer.OnNext(2);     observer.OnNext(3);     observer.OnCompleted();     return Disposable.Empty; });

Операторы:

Select:

var observable = Observable.Range(1, 5); var selected = observable.Select(x => x * 10); selected.Subscribe(Console.WriteLine); // вывод: 10, 20, 30, 40, 50

Where:

var observable = Observable.Range(1, 10); var filtered = observable.Where(x => x % 2 == 0); filtered.Subscribe(Console.WriteLine); // вывод: 2, 4, 6, 8, 10

Aggregate:

var observable = Observable.Range(1, 5); var aggregated = observable.Aggregate((acc, x) => acc + x); aggregated.Subscribe(Console.WriteLine); // вывод: 15

Работа с множественными источниками данных:

Merge:

var first = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3); var second = Observable.Interval(TimeSpan.FromSeconds(2)).Take(2); var merged = Observable.Merge(first, second); merged.Subscribe(Console.WriteLine); // вывод: 0, 0, 1, 1, 2

Concat:

var first = Observable.Range(1, 3); var second = Observable.Range(4, 3); var concatenated = Observable.Concat(first, second); concatenated.Subscribe(Console.WriteLine); // вывод: 1, 2, 3, 4, 5, 6

Zip:

var numbers = Observable.Range(1, 3); var letters = Observable.From(new[] {'a', 'b', 'c'}); var zipped = Observable.Zip(numbers, letters, (n, l) => $"{n}{l}"); zipped.Subscribe(Console.WriteLine); // вывод: 1a, 2b, 3c

Управление памятью и избежание утечек с помощью Dispose

Часто при работе с Rx используют Dispose для предотвращения утечек памяти. Когда вы подписываетесь на Observable, Rx.NET возвращает объект IDisposable, который нужно корректно утилизировать после окончания использования подписки. Это используют в проектах, где подписки могут динамически создаваться и удаляться.

var subscription = observable.Subscribe(item => Console.WriteLine(item)); // после завершения использования подписки, не забываем освободить ресурсы subscription.Dispose();

Примеры использования

Обработка кнопочных событий в пользовательском интерфейсе

Допустим, есть форма с кнопкой, и хочется реагировать на клики по этой кнопке, но только если клики происходят с интервалом не менее одной секунды, чтобы избежать двойного клика:

// подключение Rx и Windows Forms using System; using System.Windows.Forms; using System.Reactive.Linq;  public class ReactiveForm : Form {     private Button myButton;      public ReactiveForm()     {         myButton = new Button { Text = "Click me!" };         Controls.Add(myButton);          // создание потока событий клика         var clicks = Observable.FromEventPattern(             h => myButton.Click += h,             h => myButton.Click -= h);          // фильтрация событий с интервалом в 1 секунду         clicks             .Throttle(TimeSpan.FromSeconds(1))             .Subscribe(_ => Console.WriteLine("Button clicked!"));     } }

Комбинирование данных из нескольких источников

Представим, что есть два источника данных, один из которых отправляет цены товаров, а другой — информацию о скидках. Нужно комбинировать эти потоки для расчета окончательной цены:

IObservable<decimal> prices = Observable.Return(100m); // пример потока цен IObservable<decimal> discounts = Observable.Return(0.1m); // пример потока скидок  var finalPrice = prices.Zip(discounts, (price, discount) => price * (1 - discount)); finalPrice.Subscribe(price => Console.WriteLine($"Final price: {price}"));

Отслеживание состояния кнопки с задержкой

Если хочется отслеживать состояние кнопки, но с задержкой, чтобы предотвратить реакцию на случайные или кратковременные нажатия:

var buttonStates = Observable.FromEventPattern<StateChangedEventArgs>(button, "StateChanged");  buttonStates     .Select(evt => evt.EventArgs.NewState)     .DistinctUntilChanged() // игнорирование повторяющихся состояний     .Throttle(TimeSpan.FromMilliseconds(500)) // задержка для стабилизации состояния     .Subscribe(state => Console.WriteLine($"Button state stabilized to: {state}"));

Реактивное управление потоками данных API

Используем Rx для управления асинхронными вызовами API для получения погоды. Пример того, как можно организовать повторные запросы и обработку ошибок:

IObservable<string> weatherData = Observable.Interval(TimeSpan.FromMinutes(1))     .SelectMany(_ => Observable.FromAsync(() => GetWeatherAsync()))  // асинхронный вызов API     .Retry(3)  // повтор при ошибке до 3 раз     .Catch<string, Exception>(ex => Observable.Return("Error fetching weather data"));  weatherData.Subscribe(data => Console.WriteLine($"Current weather: {data}"),                       error => Console.WriteLine($"An error occurred: {error}"));

Reactive Extensions позволяет писать более чистый и реактивный код.

Статья подготовлена в преддверии старта специализации C# Developer. На странице специализации вы можете подробнее узнать о программе, а также зарегистрироваться на бесплатные вебинары.


ссылка на оригинал статьи https://habr.com/ru/articles/831968/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *