Обзор библиотеки RxPY

от автора

Привет, Хабр!

Сегодня поговорим о хорошей библиотеке для управления потоками данных в Python – RxPY — реализации Reactive Extensions для нашего любимого языка. В версии 4.0.4 эта библиотека получила ряд улучшений, и сегодня мы разберем её основной функционал.

Основы RxPY

RxPY — это библиотека, реализующая принципы функционального реактивного программирования в Python. Она позволяет создавать и управлять асинхронными потоками данных, объединяя их, фильтруя и трансформируя с помощью цепочек операторов. Основные компоненты RxPY:

  • Observable: источник данных, который может выдавать события.

  • Observer: подписчик, который реагирует на события от Observable.

  • Операторы: функции, позволяющие трансформировать, фильтровать и комбинировать Observable.

Установить RxPY проще простого:

pip install reactivex

В версии 4.0.4 RxPY произошли значительные изменения:

  • Переименование модуля: теперь импортируем reactivex, а не rx.

  • Улучшенная типизация: добавлены аннотации типов для лучшей поддержки IDE.

  • Обновленная работа с операторами: использование метода pipe для цепочки операторов.

Если ты работал с RxPY v3, вот как все это дело интегрировать:

  • Изменение импорта: import rx -> import reactivex as rx.

  • Операторы: вместо observable.map() теперь используем observable.pipe(ops.map()).

  • Удалены устаревшие функции: некоторые старые операторы и методы были удалены или переименованы.

Создание Observable и работа с операторами

Создание Observable

just()

Создает Observable, который выдает единственное значение.

import reactivex as rx  observable = rx.just(42) observable.subscribe(lambda x: print(f"Значение: {x}"))

from_()

Преобразует итерабельный объект в Observable.

observable = rx.from_([1, 2, 3, 4, 5]) observable.subscribe(lambda x: print(f"Элемент: {x}"))

interval()

Выдает последовательность чисел с заданным интервалом времени.

import time from reactivex import interval  observable = interval(1)  # каждую секунду subscription = observable.subscribe(lambda x: print(f"Тик: {x}"))  time.sleep(5) subscription.dispose()

timer()

Выдает значение после заданной задержки.

from reactivex import timer  observable = timer(3)  # через 3 секунды observable.subscribe(lambda x: print("Таймер сработал!"))

Трансформация Observable

map()

Применяет функцию к каждому элементу.

from reactivex import operators as ops  observable.pipe(     ops.map(lambda x: x * x) ).subscribe(lambda x: print(f"Квадрат: {x}"))

flat_map()

Разворачивает вложенные Observable.

def duplicate(x):     return rx.from_([x, x*2, x*3])  observable.pipe(     ops.flat_map(duplicate) ).subscribe(lambda x: print(f"Значение: {x}"))

scan()

Аналог reduce, но выдает накопленный результат на каждой итерации.

observable.pipe(     ops.scan(lambda acc, x: acc + x, seed=0) ).subscribe(lambda x: print(f"Сумма: {x}"))

Фильтрация данных

filter()

Отбирает элементы, удовлетворяющие условию.

observable.pipe(     ops.filter(lambda x: x % 2 == 0) ).subscribe(lambda x: print(f"Четное число: {x}"))

debounce()

Игнорирует значения, если они поступают слишком быстро.

observable.pipe(     ops.debounce(0.5) ).subscribe(lambda x: print(f"Получено: {x}"))

distinct()

Пропускает только уникальные значения.

observable.pipe(     ops.distinct() ).subscribe(lambda x: print(f"Уникальное значение: {x}"))

Комбинирование Observable

merge()

Объединяет несколько Observable в один поток.

obs1 = rx.from_([1, 2, 3]) obs2 = rx.from_([4, 5, 6])  rx.merge(obs1, obs2).subscribe(lambda x: print(f"Элемент: {x}"))

zip()

Объединяет элементы нескольких Observable в кортежи.

obs1 = rx.from_([1, 2, 3]) obs2 = rx.from_(['a', 'b', 'c'])  rx.zip(obs1, obs2).subscribe(lambda x: print(f"Сочетание: {x}"))

combine_latest()

Выдает комбинацию последних элементов из каждого Observable.

obs1 = rx.interval(1) obs2 = rx.interval(1.5)  rx.combine_latest(obs1, obs2).subscribe(lambda x: print(f"Комбинация: {x}"))

Тестирование потоков данных

Существуют горячие и холодные Observable

  • Холодные Observable начинают выдавать данные с момента подписки.

  • Горячие Observable уже генерируют данные, независимо от подписчиков.

Пример холодного Observable:

def create_cold_observable(scheduler):     return rx.from_([1, 2, 3], scheduler=scheduler)  scheduler = reactivex.testing.TestScheduler() observable = create_cold_observable(scheduler) 

Пример горячего Observable:

def create_hot_observable(scheduler):     return scheduler.create_hot_observable(         reactivex.testing.ReactiveTest.on_next(150, 1),         reactivex.testing.ReactiveTest.on_next(210, 2),     )  scheduler = reactivex.testing.TestScheduler() observable = create_hot_observable(scheduler)

Использование TestScheduler:

from reactivex.testing import TestScheduler, ReactiveTest  def test_map_operator():     scheduler = TestScheduler()     xs = scheduler.create_hot_observable(         ReactiveTest.on_next(150, 1),         ReactiveTest.on_next(210, 2),         ReactiveTest.on_completed(300)     )      def create():         return xs.pipe(ops.map(lambda x: x * 10))      results = scheduler.start(create)     assert results.messages == [         ReactiveTest.on_next(210, 20),         ReactiveTest.on_completed(300)     ]

Тестирование с использованием Marbles

Marble-диаграммы позволяют визуализировать потоки данных.

from reactivex.testing import marbles_testing  def test_filter_operator():     with marbles_testing() as (start, cold, hot, exp):         source = cold('--1-2-3-4-5-|')         expected = exp('----2---4---|')          result = start(source.pipe(             ops.filter(lambda x: int(x) % 2 == 0)         ))          assert result == expected

Пару примеров применения RxPY

Интеграция с asyncio

RxPY хорошо сочетается с asyncio:

import asyncio  async def main():     loop = asyncio.get_event_loop()     observable = rx.interval(1).pipe(         ops.take(5)     )      observable.subscribe(         on_next=lambda x: print(f"Tick: {x}"),         on_error=lambda e: print(f"Error: {e}"),         on_completed=lambda: print("Completed"),         scheduler=rx.scheduler.AsyncIOScheduler(loop)     )      await asyncio.sleep(6)  asyncio.run(main())

Также RxPY может помочь при работе с очередями сообщений и кэшем в Redis:

import redis from reactivex import Subject  r = redis.Redis()  def listen_to_channel(channel):     pubsub = r.pubsub()     pubsub.subscribe(channel)     for message in pubsub.listen():         if message['type'] == 'message':             yield message['data']  channel_observable = rx.from_(listen_to_channel('my_channel'))  channel_observable.subscribe(lambda msg: print(f"Received: {msg}"))

Обработка событий в event-driven архитектуре:

event_subject = Subject()  def handle_event(event):     print(f"Handling event: {event}")  event_subject.pipe(     ops.filter(lambda e: e['type'] == 'click'),     ops.map(lambda e: e['payload']) ).subscribe(handle_event)  # Где-то в коде event_subject.on_next({'type': 'click', 'payload': {'x': 100, 'y': 200}}) event_subject.on_next({'type': 'hover', 'payload': {'x': 150, 'y': 250}})

Заключение

RxPY — это отличная находка для тех, кто хочет управлять асинхронными потоками данных по новому. Моя рекомендация: используй RxPY, если работаешь с большими объемами асинхронных данных или строишь event-driven системы. В таких проектах она раскроет весь свой потенциал.

У RxPY есть и свой порог вхождения. Если тебе нужно решить простую задачу с минимальным уровнем асинхронности, возможно, дефолтные библиотеки asyncio или потоки будут проще и быстрее в освоении. Но когда дело доходит до сложных и динамических систем — RxPY может уже понадобиться.

Подробнее с RxPY можете ознакомиться в их гите.


В завершение напоминаем про открытые уроки, которые пройдут в октябре в рамках курса «Microservice Architecture»:

  • 23 октября. Метрики и Prometheus: обсудим, как собирать и использовать метрики с помощью Prometheus в Kubernetes для мониторинга приложений. Запись по ссылке

  • 24 октября. Брокеры сообщений: RabbitMQ и Kafka — узнаете, как использовать RabbitMQ и Kafka для организации асинхронной связи между микросервисами. Запись по ссылке


ссылка на оригинал статьи https://habr.com/ru/articles/849810/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *