Второй шаг в мир RxJS: Операторы RxJS — как изучать и зачем они нужны

от автора

  1. Первый шаг в мир RxJS: знакомство с Observables

  2. Второй шаг в мир RxJS: Операторы RxJS — как изучать и зачем они нужны

Добро пожаловать во вторую статью на тему RxJS! Если вы прочли первую часть, то, скорее всего, уже экспериментировали с from()interval() и знакомились с элементарными операциями — фильтрацией и преобразованием данных. На этом фундаменте мы будем строить знакомство с более хитрыми инструментами, чтобы из просто «интересных экспериментов» RxJS превратился в реальный мощный инструмент для ваших проектов.

Обращаю Ваше внимание на то, что данный опус, как и прочие в этом цикле рассчитаны исключительно на новичков, никаких открытий, прорывов и «умных» мыслей для преисполнившихся здесь не предполагается. Все повествование строится на моём понимании того, как лучше построить процесс изучения обозначенной темы.


Почему операторы так важны?

RxJS можно сравнить с конструктором LEGO: вы получаете базовую платформу (Observable) и набор инструментов (операторы), которые позволяют реализовать практически любое решение, будь то асинхронные запросы, управление событиями или обработка пользовательского ввода.

Сегодняшний вопрос: операторов очень много, как это всё освоить и не запутаться?

Продолжая логику предыдущей статьи, постулируем следующее: операторы покрывают самый широкий спектр задач, но начинать нужно с самых востребованных и понятных инструментов — это позволит вам быстро понять их мощь и интегрировать в реальные задачи.

Как выбрать порядок изучения операторов?

В этой статье мы познакомимся с базовым ядром операторов, которые должны войти в ваш репертуар в первую очередь (после того, как мы освоились с map и filter).

Я выделяю три основных этапа:

  1. Базовые операторы для фильтрации и трансформации данных. (Проще всего начать с них: работают «прямо из коробки», легко применяются к простому коду.)

  2. Комбинационные операторы. (Позволяют объединять источники данных, выбирать нужные потоки и переключаться между ними.)

  3. Механизмы управления подписками. (Работа с отписками, обработка ошибок и управление завершением потоков.)

Сегодня мы продолжим двигаться по первому этапу, углубляя знания.

В прошлой статье вы уже познакомились с filter() и map(). Давайте добавим новый оператор — take().

take() — контролируем количество данных

Если в вашей ленте начали появляться слишком длинные списки или данные, вам нужно ограничить их количество. Например, вы хотите обработать только первые три сообщения:

import { interval } from 'rxjs'; import { take } from 'rxjs/operators';  interval(1000)   .pipe(take(3)) // Установим, что нужно только первые 3 значения   .subscribe(value => console.log(value));  // Вывод: // 0 // 1 // 2 

take() говорит наблюдаемому потоку: «Спасибо, но мне достаточно лишь нескольких первых элементов». Это особенно полезно в бесконечных потоках, таких как interval.


reduce и scan

Теперь самое время познакомиться с операторами, которые позволяют работать со всем потоком данных, а не с отдельными элементами. Например, когда нужно превратить поток в одно итоговое значение или следить за накоплением данных во времени.

Давайте разбираться вместе с операторами reduce и scan.

Почему reduce и scan так важны?

Оба оператора часто сравнивают с инструментами из обычных массивов, вроде  Array.reduce(). Только вместо обработки фиксированного массива  reduce  и  scan работают с потоками данных, которые могут приходить постепенно. Это как если бы к вам по одной приносили монеты, а вы складывали их в копилку:

  1. reduce работает так: складывает монеты в копилке и говорит вам итоговую сумму только тогда, когда процесс завершён.

  2. scan действует иначе: сообщает вам промежуточные суммы каждый раз, как только вы добавляете монетку.

По сути, reduce идеально подходит для финальных вычислений, а scan — для работы с данными в реальном времени.

reduce: финальное значение по завершении

Представьте, что вы организовали благотворительный сбор средств. Люди отправляют вам пожертвования одни за другими, а ваша задача — узнать окончательную собранную сумму. Только после завершения сборов вы делаете объявление:

import { from } from 'rxjs'; import { reduce } from 'rxjs/operators';  const donations = from([10, 20, 30]); // Поток пожертвований  donations.pipe(   reduce((acc, value) => acc + value, 0) // Складываем все пожертвования ).subscribe(total => console.log(`Общая сумма: ${total}`));  // Вывод: // Общая сумма: 60 

Как это работает?

  • reduce() принимает два аргумента:

    1. Функция-аккумулятор (acc, value), которая обновляет накопленное значение.

    2. Начальное значение аккумулятора (в данном случае 0).

  • В нашем примере, мы берём каждый элемент потока (donation) и добавляем его к общей сумме acc.

Обратите внимание: результат вы получаете только после полного завершёния потока. Если поток бесконечный (например, interval), reduce просто не сработает.

Пример: подсчёт заказов

Теперь давайте применим reduce к задаче, где у нас есть список заказов, и мы хотим посчитать общий доход компании:

import { from } from 'rxjs'; import { reduce } from 'rxjs/operators';  const orders = from([   { id: 1, total: 100 },   { id: 2, total: 250 },   { id: 3, total: 50 } ]);  orders.pipe(   reduce((acc, order) => acc + order.total, 0) // Суммируем доходы ).subscribe(totalRevenue => console.log(`Общий доход: $${totalRevenue}`));  // Вывод: // Общий доход: $400 

С таким же успехом можно было бы считать количество элементов, найти максимум/минимум или объединить текстовые строки.

scan: отслеживание в реальном времени

Теперь представьте, что вы не хотите ждать финального результата, а хотите сразу наблюдать за промежуточными суммами. Например, вы запускаете таймер на пробежке и хотите видеть каждый новый результат времени.

Вот где приходит на помощь scan():

import { from } from 'rxjs'; import { scan } from 'rxjs/operators';  const donations = from([10, 20, 30]); // Поток пожертвований  donations.pipe(   scan((acc, value) => acc + value, 0) // Считаем сумму на каждом шаге ).subscribe(currentSum => console.log(`Текущая сумма: ${currentSum}`));  // Вывод: // Текущая сумма: 10 // Текущая сумма: 30 // Текущая сумма: 60 

Особенность scan — результат обновляется на каждом шаге, а не только в конце.

Пример с временными данными: банковский счёт

Теперь представьте другой пример: вы снимаете и кладёте деньги на банковский счёт. Оператор scan отлично подходит, чтобы в реальном времени подсчитывать остаток:

import { from } from 'rxjs'; import { scan } from 'rxjs/operators';  const transactions = from([100, -50, 200, -75]); // Пополнение и снятие средств  transactions.pipe(   scan((balance, transaction) => balance + transaction, 0) // Считаем баланс ).subscribe(currentBalance => console.log(`Текущий баланс: $${currentBalance}`));  // Вывод: // Текущий баланс: $100 // Текущий баланс: $50 // Текущий баланс: $250 // Текущий баланс: $175 

Здесь каждое новое значение позволяет обновлять текущий баланс счёта. Вы видите изменения мгновенно, что идеально подходит для мониторинга.

Сравнение reduce и scan

Для лучшего понимания давайте наглядно сравним, как работают  reduce  и  scan.  Например, мы обрабатываем один и тот же поток данных [1, 2, 3]:

  • reduce: вернёт только итог — 6.

  • scan: сначала вернёт 1, потом 3, а потом — 6.

Пример кода:

import { of } from 'rxjs'; import { reduce, scan } from 'rxjs/operators';  const numbers = of(1, 2, 3);  numbers.pipe(   reduce((acc, value) => acc + value, 0) ).subscribe(result => console.log(`reduce: ${result}`)); // reduce: 6  numbers.pipe(   scan((acc, value) => acc + value, 0) ).subscribe(result => console.log(`scan: ${result}`)); // scan: 1, 3, 6 

Когда использовать reduce, а когда scan?

Вот несколько подсказок по использованию:

Используйте reduce, когда:

  • Вы работаете с конечным потоком (например, массив данных, который завершится).

  • Вас интересует только окончательный результат.

  • Вам нужно вычисление итоговой суммы, произведения или обработка всех элементов потока.

Используйте scan, когда:

  • Вы хотите видеть данные в реальном времени.

  • Поток может быть бесконечным.

  • Нужно отслеживать промежуточные состояния: например, расчёт баланса, подсчёт нарастающих сумм или обновлений.

Как двигаться дальше?

После освоения этих двух инструментов вы можете начать комбинировать их с другими операторами. Например, попробуйте добавить фильтрацию перед reduce или посмотрите, как scan ведёт себя в сочетании с map для преобразования промежуточного состояния.

Дальнейшие шаги могут включать:

  • Использование scan для сложных состояний, например, работы с объектами.

  • Изучение более сложных трансформаций с reduce, чтобы собирать массивы или превращать поток данных в более сложные структуры.

Экспериментируйте, задавайте себе реальные задачи, пробуйте решать их разными способами — это лучший способ освоить RxJS.

Операторы debounceTime и throttleTime

Пришло время поговорить об управлении частотой событий, где главными героями станут  debounceTime и throttleTime.

Эти операторы являются чрезвычайно полезными, когда мы имеем дело с множеством событий, которые генерируются с высокой частотой (например, клики мышью или ввод в текстовое поле). Они позволяют управлять количеством и частотой этих событий, чтобы предотвращать перегрузку ресурсов.

Почему важны debounceTime и throttleTime?

Представьте ситуацию: пользователь быстро вводит текст в поисковую строку, вызывая запрос на сервер при каждом изменении введённых символов. Вы не хотите посылать запросы каждую миллисекунду, так как это перегрузит сервер. Вы хотите оптимизировать процесс, обрабатывая только последнее событие ввода через небольшой промежуток времени. Тут и приходит на помощь оператор debounceTime.

В другой ситуации вы, наоборот, хотите ограничить частоту обработки событий, пропуская одно событие каждые несколько секунд, например, при трекинге перемещения мыши. Для этого идеально подходит throttleTime.

Оба оператора действуют как фильтры для частых событий, но делают это по-разному.

debounceTime: ждём паузы между событиями

Как работает debounceTime?

debounceTime позволяет откладывать обработку события до тех пор, пока поток не затихнет на заданное время. Если в течение этого времени приходит новое событие, отсчёт времени начинается заново. Таким образом, debounceTime всегда пропускает последнее событие из серии.

Пример: поиск с задержкой

с этого момента мы переходим к примерам, которые уже не так просто запустить в онлайн-инструментах вроде PlayCode, упомянутого в первой статье, возможно стоит переходить к локальным проектам для запуска кода.

Рассмотрим классическую задачу: пользователь вводит поисковый запрос в текстовое поле. Чтобы избежать создания запроса на сервер при каждом нажатии клавиши, мы используем debounceTime:

import { fromEvent } from 'rxjs'; import { debounceTime, distinctUntilChanged, map } from 'rxjs/operators';  const searchInput = document.getElementById('search');  // Создаем поток событий из текстового поля fromEvent(searchInput, 'input').pipe(   map((event: any) => event.target.value), // Извлекаем текст из поля ввода   debounceTime(300), // Ждём 300мс после последнего ввода ).subscribe(searchTerm => console.log(`Запрос: ${searchTerm}`));  // Вывод: // Пользователь быстро вводит подряд «RxJ», затем останавливается на «RxJS». // На экран выводится только: «Запрос: RxJS». 
  • Как работает здесь debounceTime:

    • Если пользователь останавливается хотя бы на 300 мс, поток передаёт последнее изменение.

    • Если пользователь продолжает вводить символы, отсчёт времени сбрасывается.

  • Этот подход полезен для оптимизации обработки ввода или других частых событий.

Пример: автосохранение с задержкой

А теперь пример автосохранения документа, когда при каждом изменении текст сохраняется не моментально, а после паузы, чтобы избежать слишком частых запросов:

import { fromEvent } from 'rxjs'; import { debounceTime, map } from 'rxjs/operators';  const textarea = document.getElementById('editor');  fromEvent(textarea, 'input').pipe(   map((event: any) => event.target.value), // Извлекаем текст из редактора   debounceTime(1000) // Ждём 1 секунду после последнего изменения ).subscribe(content => console.log(`Сохраняем: ${content}`));  // Пользователь пишет текст: «Привет, RxJS». // Событие записи вызывается только после секундной паузы. 

throttleTime: ограничиваем частоту событий

Как работает throttleTime?

throttleTime позволяет пропускать события не чаще заданного интервала времени, игнорируя все остальные события. Если поток генерирует несколько событий в течение временного интервала, оператор пропустит только первое.

Пример: трекинг положения мыши

Допустим, вы хотите следить за перемещением мыши, но не хотите перегружать обработчик огромным количеством событий. Вместо этого вы хотите получать координаты мыши раз в 500 мс:

import { fromEvent } from 'rxjs'; import { throttleTime, map } from 'rxjs/operators';  fromEvent(document, 'mousemove').pipe(   throttleTime(500), // Пропускаем только одно событие каждые 500мс   map((event: MouseEvent) => ({ x: event.clientX, y: event.clientY })) ).subscribe(position => console.log(`Координаты мыши: (${position.x}, ${position.y})`));  // Вывод: // Двигаем мышь быстро по экрану. Координаты выводятся только раз в 500 мс. 

Как работает throttleTime:

  • Это похоже на включение «таймера» после первого события.

  • Все следующие события игнорируются до тех пор, пока не завершится заданный интервал времени (в нашем случае 500 мс).

Пример: блокировка многократного нажатия кнопки

Рассмотрим задачу: у вас есть кнопка, которая отправляет запрос на сервер. Чтобы избежать отправки нескольких запросов при многократных нажатиях, используем throttleTime. Это позволит обрабатывать только одно нажатие каждые 2 секунды:

import { fromEvent } from 'rxjs'; import { throttleTime } from 'rxjs/operators';  const button = document.getElementById('submit');  fromEvent(button, 'click').pipe(   throttleTime(2000) // Пропускаем только одно нажатие каждые 2 секунды ).subscribe(() => console.log('Запрос отправлен!'));  // Вывод: // Пользователь быстро нажимает кнопку 5 раз подряд. // На экран выводится только один запрос каждые 2 секунды. 

Сравнение debounceTime и throttleTime

Оба оператора позволяют контролировать частоту событий, но делают это по-разному. Вот их ключевые различия:

Характеристика

debounceTime

throttleTime

Как работает

Пропускает событие только после паузы.

Пропускает первое событие в каждом интервале.

Подходит для

Когда вам нужен только последний элемент.

Когда вам нужно ограничить частоту событий.

Игнорирует события

Все события до окончания паузы.

Все события во время интервала.

Когда использовать debounceTime, а когда throttleTime?

Используйте debounceTime, если:

  • Вы хотите дождаться, пока поток затихнет.

  • Работа связана с поиском, автосохранением или другими сценариями, где важен последний ввод.

  • Необходимо минимизировать количество запросов.

Используйте throttleTime, если:

  • Вы хотите ограничить частоту событий.

  • События происходят в реальном времени (например, трекинг мыши, скроллинг).

  • Нужно предотвратить многократные срабатывания одной и той же функции за короткий промежуток времени.

Операторы debounceTime и throttleTime помогают эффективно управлять потоками частых событий, упрощая обработку пользовательских взаимодействий. С их помощью вы сможете избежать лишних вычислений, оптимизировать запросы к серверу и улучшить пользовательский опыт.

Как и всегда, попробуйте использовать их в реальных задачах, экспериментируя с параметрами задержек и интервалов. Практика — лучший способ освоить эти полезные инструменты!

mergeMap и switchMap

Они используются для работы с потоками данных, когда каждому элементу из исходного потока нужно сопоставить новый поток. То есть каждый элемент запускает дополнительный процесс, например, асинхронный запрос или преобразование.

Для чего нужны mergeMap и switchMap?

Допустим, у вас есть поток данных (например, нажатия кнопки, текстовой ввод или даже список чисел), и каждый элемент вызывает дополнительное действие: делает запрос, преобразует данные или что-то ещё. Проблема в том, что такие действия могут вызывать много параллельных задач, и управлять ими вручную сложно.

Вот что делают mergeMap и switchMap:

  • mergeMap запускает все действия одновременно (параллельно) и возвращает результаты по мере их завершения.

  • switchMap отменяет предыдущее действие, если приходит новое событие, и возвращает результат только для последнего.

mergeMap: обрабатываем все действия одновременно

Как работает mergeMap?

При использовании mergeMap каждое событие в потоке вызывает новое действие, и все запущенные действия работают вместе. Он не отменяет старые задания, а ждёт завершения всех.

Пример: несколько чисел — несколько потоков текста

Здесь уже сложновато упрощать примеры, потому придётся читать внимательнее)

Давайте создадим стандартный поток чисел [1,2].

Для каждого элемента в потоке запустим отдельный поток, возвращающий строку <номер базового потока>-<время в секундах>. Заодно ограничим внутренний поток, чтобы не работать с длинным выводом, двух значений нам хватит.

import { from,interval } from 'rxjs'; import { mergeMap,take,map } from 'rxjs/operators';  // Поток чисел const numbers = from([1, 2]);  // Применяем mergeMap numbers.pipe(   mergeMap(num => interval(1000).pipe(take(2),map(i => (`Число ${num}-${i}`)))) // Преобразуем число в текст ).subscribe(result => console.log(result));  // Вывод: // Число 1-0 // Число 2-0 // Число 1-1 // Число 2-1 
  • Здесь каждый элемент потока [1, 2] запускает свой собственный поток, по мере завершения задач, осуществляется вывод в консоль сформированных строк. Обратите внимание на вывод: мы не ждём выполнения первой задачи, оба потока работают одновременно.

  • Здесь мы имеем дело с синтетикой, но задача вполне себе жизненная, просто можно представить, что у нас не просто числа, а Id пользователей и для каждого такого идентификатора мы хотим сделать асинхронный запрос к удаленному серверу.

switchMap: обрабатываем только последнее действие

Как работает switchMap?

switchMap работает по-другому: он отменяет все предыдущие действия, если приходит новое событие. Это полезно там, где нам нужно только последнее действие. Например, если пользователь вводит текст в поиск, старые запросы ненужны, ведь появился новый ввод.

Пример: несколько чисел, но важно только последнее

Возьмём поток чисел, но представим, что нам нужно обновлять текст только для последнего числа. Если приходит новое событие, предыдущее игнорируется:

import { from,interval } from 'rxjs'; import { switchMap,take,map } from 'rxjs/operators';  // Поток чисел const numbers = from([1, 2]);  // Применяем switchMap numbers.pipe(   switchMap(num => interval(1000).pipe(take(2),map(i => (`Число ${num}-${i}`)))) // Преобразуем число в текст ).subscribe(result => console.log(result));  // Вывод: // Число 2-0 // Число 2-1 
  • switchMap отменяет обработку первого значения, так как второе значение пришло позже.

Отличие между mergeMap и switchMap

Хотя оба оператора сопоставляют текущий поток (source) с новыми потоками данных, они подходят для разных задач.

Характеристика

mergeMap

switchMap

Что делает

Запускает все действия одновременно.

Отменяет предыдущие действия, оставляет только последнее.

Когда использовать

Если вас интересует результат всех заданий.

Если вас интересует только последнее задание.

Пример

Преобразование всех событий (например, кликов).

Поиск сообщений, текстовое поле, где важно только последнее значение.

Когда использовать mergeMap, а когда switchMap?

Используйте mergeMap, если:

  • Вы хотите обрабатывать каждое событие отдельно.

  • Нужно запускать несколько параллельных асинхронных операций.

  • Все результаты важны, независимо от порядка или времени выполнения.

Пример: обработка всех кликов мыши или отправка нескольких запросов на сервер.

Используйте switchMap, если:

  • Вас интересует только последнее событие.

  • Результаты предыдущих событий становятся ненужными, если приходит новое.

  • Вы работаете с потоками, которые обновляются часто — поиск, перемещение ползунка.

Стоит признать, что разобраться с этими двумя операторами, imho, будет сложнее всего и простых синтетических примеров уже не достаточно для понимания задач, решаемых этими ребятами, но тут, как и прежде, главный посыл: экспериментируйте. Это последняя пара операторов без которых точно не обойтись и если они кажутся вам чересчур сложными и малоприменимыми, вполне можно отложить их пока, возможно в ближайшем будущем найдется задача и под них, главное, чтобы вы поняли общую логику их работы, чтобы распознать саму возможноть применения.

Следующие шаги: комбинируем знания

Теперь, когда вы уже способны работать с базовыми операторами, пора создавать интересные комбинации. Рассмотрим задачу: каждую секунду генерируется число, но вы хотите видеть только чётные числа и при этом удваивать их:

import { interval } from 'rxjs'; import { filter, map, take } from 'rxjs/operators';  interval(1000)   .pipe(     filter(value => value % 2 === 0), // Оставим только чётные значения     map(value => value * 2),         // Удвоим их     take(5)                          // Возьмём только первые пять   )   .subscribe(value => console.log(value));  // Вывод: // 0 // 4 // 8 // 12 // 16 

Такие комбинации позволяют задействовать сразу несколько операторов для обработки потока данных. Всё, что остаётся вам, — это разделять логику операторов на шаги, чтобы не запутаться.

Заключение

Погружение в RxJS — это путь, требующий времени, экспериментов и постепенного изучения. Начинать лучше с малого: опробовать базовые операторы, такие как map, filter, take, понимая их на примерах. Это позволит заложить крепкий фундамент для работы с более сложными инструментами. Наседайте на те операторы, которые помогли решить конкретные задачи или стали интуитивно понятными. Практика — ключ к освоению любой технологии.

Не стоит стремиться выучить все операторы сразу. RxJS — мощный инструмент, который постепенно раскрывает свой потенциал с ростом вашего опыта. Достаточно освоить несколько ключевых операторов (как мы сделали в этих статьях), а затем постепенно расширять свой арсенал по мере возникновения потребностей. У офциальной документации RxJS удобная структура для поиска операторов — и это лучший инструмент для их изучения, когда базовое понимание уже есть.

Помните: главное — не количество выученных операторов, а ваш комфорт в их использовании и понимание логики потоков данных. Решайте реальные задачи, экспериментируйте с операторами, комбинируйте их, создавайте свои небольшие проекты. Такой подход позволит вам двигаться от простого к сложному легко и без перегрузок.

В третьей статье мы поговорим о распространённых ошибках при управлении потоками RxJS, которые часто усложняют жизнь разработчикам. Берегите свою продуктивность — до встречи в следующем шаге!


ссылка на оригинал статьи https://habr.com/ru/articles/895542/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *