RxJS за пределами базового использования: как писать свои операторы

от автора

RxJS (Reactive Extensions for JavaScript) — мощный инструмент для работы с асинхронными потоками данных, который используется во многих современных веб-приложениях. Хотя RxJS предоставляет богатую коллекцию операторов, иногда для решения специфических задач бывает необходимо писать свои собственные. Это позволяет избежать дублирования кода и повысить читаемость программы.

Создание своих операторов RxJS может показаться сложным, особенно для тех, кто только начал использовать библиотеку. Однако, фундаментальные принципы их разработки понятны, если погрузиться в механику работы RxJS. В этой статье мы углубимся в то, как создавать собственные pipeable и creation operators, а также рассмотрим практические примеры их применения.

Что такое оператор RxJS?

Оператор RxJS — это чистая функция, которая принимает Observable как входной параметр (или создает Observable) и возвращает иной Observable, преобразованный в соответствии с заданной логикой. Операторы являются основой реактивного программирования, позволяя удобно трансформировать, фильтровать, комбинировать, управлять потоками данных и процессами асинхронной обработки.

Основные характеристики оператора RxJS

  1. Чистота функции: Оператор должен быть чистой функцией. Это значит, что он не влияет на внешние состояния и любой вызов оператора с одинаковыми входными параметрами всегда должен давать одинаковый результат.

    Пример: Оператор map всегда возвращает значения, применяя трансформацию, без побочных эффектов.

  2. Неизменность входящего Observable: Оператор не изменяет исходный Observable, а создает новый — это важное свойство функционального программирования. Оригинальный поток остается неизменным.

  3. К цепочке операторов можно добавлять новые логики: Операторы поддерживают композиционность, что означает, что один или несколько операторов могут объединяться в цепочку, используя метод pipe.

Типы операторов

Операторы делятся на два типа:

  • Pipeable операторы — обычные функции, которые используются в цепочке (pipe). Они принимают поток и возвращают новый поток:

import { of } from 'rxjs'; import { map } from 'rxjs/operators';  of(1, 2, 3).pipe(   map(x => x * 2) ).subscribe(console.log); // 2, 4, 6
  • Creation Functions — функции, которые создают Observable. Например, of, fromEvent:

   import { fromEvent } from 'rxjs';    const clicks$ = fromEvent(document, 'click');    clicks$.subscribe(console.log);

Как работают pipeable операторы

Любой пользовательский pipeable оператор обычно выполняет следующие задачи:

  • Принимает входной поток (source).

  • Добавляет к нему кастомную логику.

  • Создает и возвращает новый Observable, который подчиняется полученной логике.

Вы можете представить их как цепочку обработки данных в конвейере. Давайте разберёмся, как написать такой оператор.

Как создать pipeable оператор

Шаг 1: Определите задачу

Определите, какую задачу будет решать ваш кастомный оператор. Например, давайте реализуем оператор debounceIf, который применяет debounce к потоку только при выполнении определенного условия. Этот оператор может использоваться для оптимизации обработки событий, таких как клики мыши или ввод текста.

Шаг 2: Определение базовой структуры

Создадим функцию, которая будет являться нашим оператором:

import { Observable } from 'rxjs';  export function debounceIf<T>(condition: () => boolean, delayTime: number)   :(source$: Observable<T>) => Observable<T> {   return function (source$: Observable<T>): Observable<T> {     return new Observable<T>(subscriber => {       // Создаем кастомную подписку     });   }; }
  • condition — возвращает true или false, определяя, нужно ли применять debounce.

  • delayTime — время задержки в миллисекундах.

Шаг 3: Реализация логики

import {Observable} from 'rxjs';  export function debounceIf<T>(condition: () => boolean, duration: number)   : (source: Observable<T>) => Observable<T> {   return source => new Observable(observer => {     let timeoutId: any | null = null;     let subscription = source.subscribe({       next(value) {         if (timeoutId) clearTimeout(timeoutId) // сбрасываем таймер чтобы исключить срабатывания         if (condition()) {           // Если условие выполнено, используем debounce           timeoutId = setTimeout(() => {             observer.next(value);             console.log(timeoutId)           }, duration);         } else {           // Если нет, пропускаем значение сразу           observer.next(value);         }       },       error(err) {         observer.error(err);       },       complete() {         observer.complete();       },     });     return () => {       subscription.unsubscribe();       if (timeoutId) clearTimeout(timeoutId);     };   }); } 

Пример использования:

import {Component, OnInit} from '@angular/core'; import {fromEvent} from "rxjs"; import {debounceIf} from "@pipes/debounceIf";  @Component({   selector: 'app-home',   templateUrl: 'home.component.html', }) export class HomeComponent implements OnInit {   clicks: number[] = [];   debounced = false;    ngOnInit(): void {     fromEvent(document, 'click')         .pipe(debounceIf(() => this.debounced, 3000))         .subscribe(() => this.clicks.push(new Date().getTime()));   }    toggleDebounced(): void {     this.debounced = !this.debounced;   } }

Этот код будет пропускать только последний клик по кнопке если в течение двух секунд не происходило новых нажатий при включенном переключателе.


Операторы создания (creation operators)

Сreation function в RxJS отвечает за создание нового Observable с определенной логикой. В отличие от операторов (которые принимают поток и возвращают поток), creation-функция используется там, где вам необходимо начать с чего-то нового, например: фиксированных значений, событий DOM, таймеров и т.д. Примеры встроенных creation functions в RxJS: of, from, interval, fromEvent.

Что создадим:

Предположим, мы хотим создать функцию delayedRange, которая через определенную задержку эмитирует значения из диапазона чисел. Она позволяет настроить:

  • начальное число диапазона;

  • конечное число диапазона;

  • задержку между эмиссиями значений.

Шаги создания кастомной creation function

  • Использование new Observable
    Все кастомные creation functions в RxJS строятся на использовании конструктора Observable. Конструктор принимает аргументом функцию (subscribe), в которой определяется логика эмиссии значений.

  • Поддержка управления подпиской
    Необходимо предоставить механизм для очистки ресурсов (unsubscribe). Это может быть полезно, если подписчик завершает поток раньше времени. Например, нужно очищать таймеры, чтобы не было утечек памяти.

  • Эмит данных с использованием next, обработка ошибок и завершение
    Через объект subscriber мы вызываем соответствующие методы:

    • subscriber.next(value) — эмитируем значение;

    • subscriber.error(err) — сообщаем об ошибке;

    • subscriber.complete() — завершаем поток.

import { Observable } from 'rxjs';  /**  * Функция delayedRange создает Observable, который эмитирует числа  * из указанного диапазона с заданной задержкой между значениями.  *  * @param start Начальное число диапазона.  * @param end Конечное число диапазона.  * @param delayTime Задержка между эмиссиями значений (в миллисекундах).  * @returns Observable, который эмитирует числа из диапазона.  */ export function delayedRange(start: number, end: number, delayTime: number): Observable<number> {   return new Observable<number>(subscriber => {     let current = start;     // Эмитирует значения с заданной задержкой     const emitValue = () => {       if (current <= end) {         subscriber.next(current);         current++;         setTimeout(emitValue, delayTime); // Эмитируем следующее значение       } else {         subscriber.complete(); // Завершаем поток, если достигнут конец диапазона       }     };     emitValue(); // Запускаем эмиссию значений     // Функция очистки, которая вызовется при unsubscribe     return () => {       console.log('Подписка завершена.');     };   }); } 

Объяснение кода

  1. Параметры функции:

    • start: number — начальное значение диапазона.

    • end: number — конечное значение диапазона.

    • delayTime: number — задержка между эмиссиями каждого значения (в миллисекундах).

  2. Создание Observable: Мы напрямую используем new Observable, чтобы задать логику работы потока: Внутри мы описываем, как значения будут эмитироваться подписчику.

  3. Эмит значений через метод next: Значения от start до end эмитируются последовательно с задержкой. Для этого используется метод setTimeout

  4. Очистка ресурсов: Если подписчик завершит подписку до окончания эмиссии, следует освободить ресурсы (в данном случае здесь только лог выводится, но на практике вы можете сбрасывать таймеры, очищать объекты и т.п.):

   return () => {       console.log('Подписка завершена.');    };

Эта функция возвращается из new Observable и вызывается автоматически при вызове unsubscribe.

import { delayedRange } from './custom-operators';  delayedRange(1, 5, 1000).subscribe({   next(value) {     console.log('Получено значение:', value);   },   complete() {     console.log('Поток завершен.');   } });  /* Вывод: Получено значение: 1 Получено значение: 2 Получено значение: 3 Получено значение: 4 Получено значение: 5 Поток завершен. */

Замечания по реализации

  1. Обработка ошибок: Если в процессе создания значений может произойти ошибка, используйте метод subscriber.error(err). В данном примере ошибок нет, но это важно учитывать в более сложных сценариях.

  2. Тестирование: Покройте вашу функцию unit-тестами, чтобы убедиться в корректности ее работы.

  3. Разумное управление ресурсами: В production-коде стоит продумать корректное освобождение ресурсов, особенно если вы работаете с асинхронными API (например, setTimeout, setInterval, HTTP-запросы).

Заключение

Создание собственных операторов в RxJS — это мощный способ расширить возможности реактивного программирования и адаптировать его под уникальные задачи вашего проекта. В этой статье мы рассмотрели, как создавать pipeable и creation операторы для более удобной и читаемой работы с потоками данных.


ссылка на оригинал статьи https://habr.com/ru/articles/883294/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *