Привет, Хабр!
Сегодня поговорим о хорошей библиотеке для управления потоками данных в Python – RxPY — реализации Reactive Extensions для нашего любимого языка. В версии 4.0.4 эта библиотека получила ряд улучшений, и сегодня мы разберем её основной функционал.
Основы RxPY
RxPY — это библиотека, реализующая принципы функционального реактивного программирования в Python. Она позволяет создавать и управлять асинхронными потоками данных, объединяя их, фильтруя и трансформируя с помощью цепочек операторов. Основные компоненты RxPY:
-
Observable: источник данных, который может выдавать события.
-
Observer: подписчик, который реагирует на события от Observable.
-
Операторы: функции, позволяющие трансформировать, фильтровать и комбинировать Observable.
Установить RxPY проще простого:
pip install reactivex
В версии 4.0.4 RxPY произошли значительные изменения:
-
Переименование модуля: теперь импортируем
reactivex
, а неrx
. -
Улучшенная типизация: добавлены аннотации типов для лучшей поддержки IDE.
-
Обновленная работа с операторами: использование метода
pipe
для цепочки операторов.
Если ты работал с RxPY v3, вот как все это дело интегрировать:
-
Изменение импорта:
import rx
->import reactivex as rx
. -
Операторы: вместо
observable.map()
теперь используемobservable.pipe(ops.map())
. -
Удалены устаревшие функции: некоторые старые операторы и методы были удалены или переименованы.
Создание Observable и работа с операторами
Создание Observable
just()
Создает Observable, который выдает единственное значение.
import reactivex as rx observable = rx.just(42) observable.subscribe(lambda x: print(f"Значение: {x}"))
from_()
Преобразует итерабельный объект в Observable.
observable = rx.from_([1, 2, 3, 4, 5]) observable.subscribe(lambda x: print(f"Элемент: {x}"))
interval()
Выдает последовательность чисел с заданным интервалом времени.
import time from reactivex import interval observable = interval(1) # каждую секунду subscription = observable.subscribe(lambda x: print(f"Тик: {x}")) time.sleep(5) subscription.dispose()
timer()
Выдает значение после заданной задержки.
from reactivex import timer observable = timer(3) # через 3 секунды observable.subscribe(lambda x: print("Таймер сработал!"))
Трансформация Observable
map()
Применяет функцию к каждому элементу.
from reactivex import operators as ops observable.pipe( ops.map(lambda x: x * x) ).subscribe(lambda x: print(f"Квадрат: {x}"))
flat_map()
Разворачивает вложенные Observable.
def duplicate(x): return rx.from_([x, x*2, x*3]) observable.pipe( ops.flat_map(duplicate) ).subscribe(lambda x: print(f"Значение: {x}"))
scan()
Аналог reduce
, но выдает накопленный результат на каждой итерации.
observable.pipe( ops.scan(lambda acc, x: acc + x, seed=0) ).subscribe(lambda x: print(f"Сумма: {x}"))
Фильтрация данных
filter()
Отбирает элементы, удовлетворяющие условию.
observable.pipe( ops.filter(lambda x: x % 2 == 0) ).subscribe(lambda x: print(f"Четное число: {x}"))
debounce()
Игнорирует значения, если они поступают слишком быстро.
observable.pipe( ops.debounce(0.5) ).subscribe(lambda x: print(f"Получено: {x}"))
distinct()
Пропускает только уникальные значения.
observable.pipe( ops.distinct() ).subscribe(lambda x: print(f"Уникальное значение: {x}"))
Комбинирование Observable
merge()
Объединяет несколько Observable в один поток.
obs1 = rx.from_([1, 2, 3]) obs2 = rx.from_([4, 5, 6]) rx.merge(obs1, obs2).subscribe(lambda x: print(f"Элемент: {x}"))
zip()
Объединяет элементы нескольких Observable в кортежи.
obs1 = rx.from_([1, 2, 3]) obs2 = rx.from_(['a', 'b', 'c']) rx.zip(obs1, obs2).subscribe(lambda x: print(f"Сочетание: {x}"))
combine_latest()
Выдает комбинацию последних элементов из каждого Observable.
obs1 = rx.interval(1) obs2 = rx.interval(1.5) rx.combine_latest(obs1, obs2).subscribe(lambda x: print(f"Комбинация: {x}"))
Тестирование потоков данных
Существуют горячие и холодные Observable
-
Холодные Observable начинают выдавать данные с момента подписки.
-
Горячие Observable уже генерируют данные, независимо от подписчиков.
Пример холодного Observable:
def create_cold_observable(scheduler): return rx.from_([1, 2, 3], scheduler=scheduler) scheduler = reactivex.testing.TestScheduler() observable = create_cold_observable(scheduler)
Пример горячего Observable:
def create_hot_observable(scheduler): return scheduler.create_hot_observable( reactivex.testing.ReactiveTest.on_next(150, 1), reactivex.testing.ReactiveTest.on_next(210, 2), ) scheduler = reactivex.testing.TestScheduler() observable = create_hot_observable(scheduler)
Использование TestScheduler:
from reactivex.testing import TestScheduler, ReactiveTest def test_map_operator(): scheduler = TestScheduler() xs = scheduler.create_hot_observable( ReactiveTest.on_next(150, 1), ReactiveTest.on_next(210, 2), ReactiveTest.on_completed(300) ) def create(): return xs.pipe(ops.map(lambda x: x * 10)) results = scheduler.start(create) assert results.messages == [ ReactiveTest.on_next(210, 20), ReactiveTest.on_completed(300) ]
Тестирование с использованием Marbles
Marble-диаграммы позволяют визуализировать потоки данных.
from reactivex.testing import marbles_testing def test_filter_operator(): with marbles_testing() as (start, cold, hot, exp): source = cold('--1-2-3-4-5-|') expected = exp('----2---4---|') result = start(source.pipe( ops.filter(lambda x: int(x) % 2 == 0) )) assert result == expected
Пару примеров применения RxPY
Интеграция с asyncio
RxPY хорошо сочетается с asyncio
:
import asyncio async def main(): loop = asyncio.get_event_loop() observable = rx.interval(1).pipe( ops.take(5) ) observable.subscribe( on_next=lambda x: print(f"Tick: {x}"), on_error=lambda e: print(f"Error: {e}"), on_completed=lambda: print("Completed"), scheduler=rx.scheduler.AsyncIOScheduler(loop) ) await asyncio.sleep(6) asyncio.run(main())
Также RxPY может помочь при работе с очередями сообщений и кэшем в Redis:
import redis from reactivex import Subject r = redis.Redis() def listen_to_channel(channel): pubsub = r.pubsub() pubsub.subscribe(channel) for message in pubsub.listen(): if message['type'] == 'message': yield message['data'] channel_observable = rx.from_(listen_to_channel('my_channel')) channel_observable.subscribe(lambda msg: print(f"Received: {msg}"))
Обработка событий в event-driven архитектуре:
event_subject = Subject() def handle_event(event): print(f"Handling event: {event}") event_subject.pipe( ops.filter(lambda e: e['type'] == 'click'), ops.map(lambda e: e['payload']) ).subscribe(handle_event) # Где-то в коде event_subject.on_next({'type': 'click', 'payload': {'x': 100, 'y': 200}}) event_subject.on_next({'type': 'hover', 'payload': {'x': 150, 'y': 250}})
Заключение
RxPY — это отличная находка для тех, кто хочет управлять асинхронными потоками данных по новому. Моя рекомендация: используй RxPY, если работаешь с большими объемами асинхронных данных или строишь event-driven системы. В таких проектах она раскроет весь свой потенциал.
У RxPY есть и свой порог вхождения. Если тебе нужно решить простую задачу с минимальным уровнем асинхронности, возможно, дефолтные библиотеки asyncio
или потоки будут проще и быстрее в освоении. Но когда дело доходит до сложных и динамических систем — RxPY может уже понадобиться.
Подробнее с RxPY можете ознакомиться в их гите.
В завершение напоминаем про открытые уроки, которые пройдут в октябре в рамках курса «Microservice Architecture»:
-
23 октября. Метрики и Prometheus: обсудим, как собирать и использовать метрики с помощью Prometheus в Kubernetes для мониторинга приложений. Запись по ссылке
-
24 октября. Брокеры сообщений: RabbitMQ и Kafka — узнаете, как использовать RabbitMQ и Kafka для организации асинхронной связи между микросервисами. Запись по ссылке
ссылка на оригинал статьи https://habr.com/ru/articles/849810/
Добавить комментарий