RxJS (Reactive Extensions for JavaScript) — мощный инструмент для работы с асинхронными потоками данных, который используется во многих современных веб-приложениях. Хотя RxJS предоставляет богатую коллекцию операторов, иногда для решения специфических задач бывает необходимо писать свои собственные. Это позволяет избежать дублирования кода и повысить читаемость программы.
Создание своих операторов RxJS может показаться сложным, особенно для тех, кто только начал использовать библиотеку. Однако, фундаментальные принципы их разработки понятны, если погрузиться в механику работы RxJS. В этой статье мы углубимся в то, как создавать собственные pipeable и creation operators, а также рассмотрим практические примеры их применения.
Что такое оператор RxJS?
Оператор RxJS — это чистая функция, которая принимает Observable как входной параметр (или создает Observable) и возвращает иной Observable, преобразованный в соответствии с заданной логикой. Операторы являются основой реактивного программирования, позволяя удобно трансформировать, фильтровать, комбинировать, управлять потоками данных и процессами асинхронной обработки.
Основные характеристики оператора RxJS
-
Чистота функции: Оператор должен быть чистой функцией. Это значит, что он не влияет на внешние состояния и любой вызов оператора с одинаковыми входными параметрами всегда должен давать одинаковый результат.
Пример: Оператор map всегда возвращает значения, применяя трансформацию, без побочных эффектов.
-
Неизменность входящего Observable: Оператор не изменяет исходный Observable, а создает новый — это важное свойство функционального программирования. Оригинальный поток остается неизменным.
-
К цепочке операторов можно добавлять новые логики: Операторы поддерживают композиционность, что означает, что один или несколько операторов могут объединяться в цепочку, используя метод pipe.
Типы операторов
Операторы делятся на два типа:
-
Pipeable операторы — обычные функции, которые используются в цепочке (pipe). Они принимают поток и возвращают новый поток:
import { of } from 'rxjs'; import { map } from 'rxjs/operators'; of(1, 2, 3).pipe( map(x => x * 2) ).subscribe(console.log); // 2, 4, 6
-
Creation Functions — функции, которые создают Observable. Например, of, fromEvent:
import { fromEvent } from 'rxjs'; const clicks$ = fromEvent(document, 'click'); clicks$.subscribe(console.log);
Как работают pipeable операторы
Любой пользовательский pipeable оператор обычно выполняет следующие задачи:
-
Принимает входной поток (source).
-
Добавляет к нему кастомную логику.
-
Создает и возвращает новый Observable, который подчиняется полученной логике.
Вы можете представить их как цепочку обработки данных в конвейере. Давайте разберёмся, как написать такой оператор.
Как создать pipeable оператор
Шаг 1: Определите задачу
Определите, какую задачу будет решать ваш кастомный оператор. Например, давайте реализуем оператор debounceIf, который применяет debounce к потоку только при выполнении определенного условия. Этот оператор может использоваться для оптимизации обработки событий, таких как клики мыши или ввод текста.
Шаг 2: Определение базовой структуры
Создадим функцию, которая будет являться нашим оператором:
import { Observable } from 'rxjs'; export function debounceIf<T>(condition: () => boolean, delayTime: number) :(source$: Observable<T>) => Observable<T> { return function (source$: Observable<T>): Observable<T> { return new Observable<T>(subscriber => { // Создаем кастомную подписку }); }; }
-
condition — возвращает true или false, определяя, нужно ли применять debounce.
-
delayTime — время задержки в миллисекундах.
Шаг 3: Реализация логики
import {Observable} from 'rxjs'; export function debounceIf<T>(condition: () => boolean, duration: number) : (source: Observable<T>) => Observable<T> { return source => new Observable(observer => { let timeoutId: any | null = null; let subscription = source.subscribe({ next(value) { if (timeoutId) clearTimeout(timeoutId) // сбрасываем таймер чтобы исключить срабатывания if (condition()) { // Если условие выполнено, используем debounce timeoutId = setTimeout(() => { observer.next(value); console.log(timeoutId) }, duration); } else { // Если нет, пропускаем значение сразу observer.next(value); } }, error(err) { observer.error(err); }, complete() { observer.complete(); }, }); return () => { subscription.unsubscribe(); if (timeoutId) clearTimeout(timeoutId); }; }); }
Пример использования:
import {Component, OnInit} from '@angular/core'; import {fromEvent} from "rxjs"; import {debounceIf} from "@pipes/debounceIf"; @Component({ selector: 'app-home', templateUrl: 'home.component.html', }) export class HomeComponent implements OnInit { clicks: number[] = []; debounced = false; ngOnInit(): void { fromEvent(document, 'click') .pipe(debounceIf(() => this.debounced, 3000)) .subscribe(() => this.clicks.push(new Date().getTime())); } toggleDebounced(): void { this.debounced = !this.debounced; } }
Этот код будет пропускать только последний клик по кнопке если в течение двух секунд не происходило новых нажатий при включенном переключателе.
Операторы создания (creation operators)
Сreation function в RxJS отвечает за создание нового Observable с определенной логикой. В отличие от операторов (которые принимают поток и возвращают поток), creation-функция используется там, где вам необходимо начать с чего-то нового, например: фиксированных значений, событий DOM, таймеров и т.д. Примеры встроенных creation functions в RxJS: of, from, interval, fromEvent.
Что создадим:
Предположим, мы хотим создать функцию delayedRange, которая через определенную задержку эмитирует значения из диапазона чисел. Она позволяет настроить:
-
начальное число диапазона;
-
конечное число диапазона;
-
задержку между эмиссиями значений.
Шаги создания кастомной creation function
-
Использование new Observable
Все кастомные creation functions в RxJS строятся на использовании конструктора Observable. Конструктор принимает аргументом функцию (subscribe), в которой определяется логика эмиссии значений. -
Поддержка управления подпиской
Необходимо предоставить механизм для очистки ресурсов (unsubscribe). Это может быть полезно, если подписчик завершает поток раньше времени. Например, нужно очищать таймеры, чтобы не было утечек памяти. -
Эмит данных с использованием next, обработка ошибок и завершение
Через объект subscriber мы вызываем соответствующие методы:-
subscriber.next(value) — эмитируем значение;
-
subscriber.error(err) — сообщаем об ошибке;
-
subscriber.complete() — завершаем поток.
-
import { Observable } from 'rxjs'; /** * Функция delayedRange создает Observable, который эмитирует числа * из указанного диапазона с заданной задержкой между значениями. * * @param start Начальное число диапазона. * @param end Конечное число диапазона. * @param delayTime Задержка между эмиссиями значений (в миллисекундах). * @returns Observable, который эмитирует числа из диапазона. */ export function delayedRange(start: number, end: number, delayTime: number): Observable<number> { return new Observable<number>(subscriber => { let current = start; // Эмитирует значения с заданной задержкой const emitValue = () => { if (current <= end) { subscriber.next(current); current++; setTimeout(emitValue, delayTime); // Эмитируем следующее значение } else { subscriber.complete(); // Завершаем поток, если достигнут конец диапазона } }; emitValue(); // Запускаем эмиссию значений // Функция очистки, которая вызовется при unsubscribe return () => { console.log('Подписка завершена.'); }; }); }
Объяснение кода
-
Параметры функции:
-
start: number — начальное значение диапазона.
-
end: number — конечное значение диапазона.
-
delayTime: number — задержка между эмиссиями каждого значения (в миллисекундах).
-
-
Создание Observable: Мы напрямую используем new Observable, чтобы задать логику работы потока: Внутри мы описываем, как значения будут эмитироваться подписчику.
-
Эмит значений через метод next: Значения от start до end эмитируются последовательно с задержкой. Для этого используется метод setTimeout
-
Очистка ресурсов: Если подписчик завершит подписку до окончания эмиссии, следует освободить ресурсы (в данном случае здесь только лог выводится, но на практике вы можете сбрасывать таймеры, очищать объекты и т.п.):
return () => { console.log('Подписка завершена.'); };
Эта функция возвращается из new Observable и вызывается автоматически при вызове unsubscribe.
import { delayedRange } from './custom-operators'; delayedRange(1, 5, 1000).subscribe({ next(value) { console.log('Получено значение:', value); }, complete() { console.log('Поток завершен.'); } }); /* Вывод: Получено значение: 1 Получено значение: 2 Получено значение: 3 Получено значение: 4 Получено значение: 5 Поток завершен. */
Замечания по реализации
-
Обработка ошибок: Если в процессе создания значений может произойти ошибка, используйте метод subscriber.error(err). В данном примере ошибок нет, но это важно учитывать в более сложных сценариях.
-
Тестирование: Покройте вашу функцию unit-тестами, чтобы убедиться в корректности ее работы.
-
Разумное управление ресурсами: В production-коде стоит продумать корректное освобождение ресурсов, особенно если вы работаете с асинхронными API (например, setTimeout, setInterval, HTTP-запросы).
Заключение
Создание собственных операторов в RxJS — это мощный способ расширить возможности реактивного программирования и адаптировать его под уникальные задачи вашего проекта. В этой статье мы рассмотрели, как создавать pipeable и creation операторы для более удобной и читаемой работы с потоками данных.
ссылка на оригинал статьи https://habr.com/ru/articles/883294/
Добавить комментарий