Привет, Хабр! Меня зовут Константинов Александр, я Android-разработчик в «Студии Олега Чулакова». Сегодня мы сравим операторы RxJava 3 и Flow. Статья будет полезна как для изучения операторов, так и для более легкого перехода с RxJava на Flow. Буду рад вашему фидбеку и комметариям.
Ну что ж, давайте начинать!
В этой статье рассмотрим только самые популярые операторы и примеры кода:
Операторы преобразования
RxJava map / Flow map:
Преобразует элементы, применяя функцию к каждому из них. Работает последовательно.
Пример кода
//RxJava map fun main() { Observable.range(1, 5) .map { it * 2 } .subscribe(::println) // Вывод: 2, 4, 6, 8, 10 }
//Flow map fun main() = runBlocking { (1..5).asFlow() .map { it * 2 } .collect { println(it) } // Вывод: 2, 4, 6, 8, 10 }
RxJava flatMap / Flow flatMapMerge:
Преобразует каждое значение в поток. RxJava flatMap
/ Flow flatMapMerge
, в отличие от RxJava concatMap
/ Flow flatMapConcat
, обрабатывает элементы исходного потока параллельно, а не последовательно. Потоки для каждого элемента запускаются одновременно и эмитят значения в порядке их готовности, а не поочередно, поэтому порядок значений может отличаться и не соответствовать исходному. Следует использовать, когда порядок обработки не критичен и можно выиграть в производительности от параллельной обработки.
Пример кода
//RxJava flatMap fun main() { Observable.just(1, 2, 3) .flatMap { i -> Observable.just(i * 10) .concatWith(Observable.just(i * 20).delay(10, TimeUnit.MILLISECONDS)) } .blockingSubscribe(::println) }
//Flow flatMapMerge fun main() = runBlocking { flowOf(1, 2, 3) .flatMapMerge { flow { emit(it * 10); delay(10); emit(it * 20) } } .collect { println(it) } }
RxJava concatMap / Flow flatMapConcat:
Преобразует каждое значение в поток и объединяет их последовательно. Берет каждый элемент из исходного потока и, для каждого элемента, создает новый поток. Эти потоки затем объединяются в один линейный поток. Важно отметить, что объединение происходит поочередно: следующий поток начинается только после того, как завершен предыдущий.
Пример кода
//RxJava concatMap fun main() { Observable.just(1, 2, 3) .concatMap { i -> Observable.just(i * 10, i * 20) } .subscribe(::println) // Вывод: 10, 20, 20, 40, 30, 60 }
//Flow flatMapConcat fun main() = runBlocking { flowOf(1, 2, 3) .flatMapConcat { flowOf(it * 10, it * 20) } .collect { println(it) } // Вывод: 10, 20, 20, 40, 30, 60 }
RxJava buffer / Flow buffer:
Группирует элементы в списки фиксированного размера.
Используется для обработки элементов в буферах (или пакетами), что позволяет повысить производительность, особенно в случаях, когда обработка каждого элемента по отдельности блокирует лишние ресурсы.
Пример кода
//RxJava buffer fun main() { Observable.just(1, 2, 3, 4, 5) .doOnNext { println("Emitting $it") } .buffer(2) // Буферизуем элементы по 2 .subscribe { buffer -> println("Collected $buffer") } Thread.sleep(1000) }
//Flow buffer fun main() = runBlocking { flowOf(1, 2, 3, 4, 5) .onEach { println("Emitting $it") } .buffer() // Буферизуем все элементы .collect { value -> println("Collected $value") } }
RxJava scan / Flow scan:
Последовательно применяет функцию к элементам, возвращая промежуточные результаты. Используется для накопления значений с возможностью учета предыдущих накопленных значений.
Пример кода
//RxJava scan fun main() { Observable.fromArray(1, 2, 3) .scan(0) { accumulator, value -> accumulator + value } .subscribe { result -> println(result) // Вывод: 0, 1, 3, 6 } }
//Flow scan fun main() = runBlocking<Unit> { // Создаем Flow от 1 до 3 val flow = (1..3).asFlow() // Применяем оператор scan flow.scan(0) { accumulator, value -> accumulator + value }.collect { result -> println(result) // Вывод: 0, 1, 3, 6 } }
RxJava groupBy / Аналог на Flow отсутствует:
Разбивает поток на несколько потоков, сгруппированных по ключам.
Пример кода
fun main() { val observable = Observable.just( "one", "two", "three", "four", "five", "six" ) observable.groupBy { it.length } .flatMapSingle { group -> group.toList().map { list -> Pair(group.key, list) } } .subscribe { pair -> println("Length: ${pair.first}, Words: ${pair.second}") } }
Фильтрация
RxJava distinctUntilChanged / Flow distinctUntilChanged
Исключает повторяющиеся последовательные элементы.
Пример кода
//RxJava distinctUntilChanged fun main() { Observable.just(1, 1, 2, 2, 3, 1) .distinctUntilChanged() .subscribe(::println) // Вывод: 1, 2, 3, 1 }
//Flow distinctUntilChanged fun main() = runBlocking { flowOf(1, 1, 2, 2, 3, 1) .distinctUntilChanged() .collect { println(it) } // Вывод: 1, 2, 3, 1 }
RxJava filter / Flow filter:
Отбирает элементы, соответствующие условию.
Пример кода
//RxJava filter fun main() { Observable.range(1, 5) .filter { it % 2 == 0 } .subscribe(::println) // Вывод: 2, 4 }
//Flow filter fun main() = runBlocking { (1..5).asFlow() .filter { it % 2 == 0 } .collect { println(it) } // Вывод: 2, 4 }
RxJava take / Flow take:
Берет первые N элементов из входящего потока.
Пример кода
//RxJava take fun main() { Observable.range(1, 10) .take(3) .subscribe(::println) // Вывод: 1, 2, 3 }
//Flow take fun main() = runBlocking { (1..10).asFlow() .take(3) .collect { println(it) } // Вывод: 1, 2, 3 }
RxJava skip / Flow drop:
Пропускает первые N элементов.
Пример кода
//Flow drop fun main() = runBlocking { flowOf(1, 2, 3, 4, 5) .drop(2) // Пропускаем первые 2 элемента .collect { value -> println(value) // Выведет 3, 4, 5 } }
//RxJava skip fun main() { Observable.just(1, 2, 3, 4, 5) .skip(2) // Пропускаем первые 2 элемента .subscribe { value -> println(value) // Выведет 3, 4, 5 } }
Комбинирование
RxJava merge / Flow merge:
Объединяет заданные потоки в один поток. Все потоки объединяются одновременно, без ограничения на количество одновременно собираемых потоков.
Пример кода
//Flow merge fun main() = runBlocking { val flow1 = flow { repeat(3) { emit("From flow1: $it") delay(500L) // Эмиссия каждые 500 мс } } val flow2 = flow { repeat(3) { emit("From flow2: $it") delay(1000L) // Эмиссия каждые 1000 мс } } val startTime = System.currentTimeMillis() merge(flow1, flow2).collect { value -> println("$value at ${System.currentTimeMillis() - startTime} ms") } }
//RxJava merge fun main() { val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS) .take(3) .map { "From observable1: $it" } val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS) .take(3) .map { "From observable2: $it" } val startTime = System.currentTimeMillis() Observable.merge(observable1, observable2) .subscribe { value -> println("$value at ${System.currentTimeMillis() - startTime} ms") } // Ожидание завершения потоков Thread.sleep(4000) }
RxJava zip / Flow zip:
Объединяет потоки, применяя функцию комбинирования к элементам. Элемент для которого нет пары не испускается.
Пример кода
//RxJava zip fun main() { val nums = Observable.range(1, 3) val letters = Observable.just("A", "B", "C", "D") Observable.zip(nums, letters) { n, l -> "$n -> $l" } .subscribe(::println) // Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится }
//Fllow zip fun main() = runBlocking { val numbers = (1..3).asFlow() val letters = flowOf("A", "B", "C", "D") numbers.zip(letters) { n, l -> "$n -> $l" } .collect { println(it) } // Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится }
RxJava combineLatest / Flow combine:
Объединяет потоки, используя последние элементы из каждого.
Пример кода
//Flow combine fun main() = runBlocking { val flow1 = flow { repeat(3) { emit("String: s$it") delay(500L) // Эмиссия каждые 500 мс } } val flow2 = flow { repeat(3) { emit(it) delay(1000L) // Эмиссия каждые 1000 мс } } flow1.combine(flow2) { str, num -> "$str and Int: $num" }.collect { value -> println(value) } }
//RxJava combineLatest fun main() { val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS) .take(3) .map { "String: s$it" } val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS) .take(3) Observable.combineLatest(observable1, observable2) { str: String, num: Long -> "$str and Int: $num" } .subscribe { value -> println(value) } Thread.sleep(5000L) }
Агрегация
RxJava reduce / Flow reduce:
В Kotlin Flow оператор reduce аналогичен одноименному оператору в RxJava. Оба используются для преобразования коллекции значений в одиночное значение, используя начальное значение и функцию агрегирования (аккумулятора). Применяет функцию агрегирования ко всем элементам, возвращает одно значение.
Пример кода
//Flow reduce fun main() = runBlocking { val result = (1..5).asFlow() .reduce { accumulator, value -> accumulator + value } println("Flow Reduce Result: $result") }
//RxJava reduce fun main() { Observable.fromIterable(listOf(1, 2, 3, 4, 5)) .reduce { accumulator, value -> accumulator + value } .subscribe { result -> println("RxJava Reduce Result: $result") } }
-
count:
Используются для подсчета количества элементов в потоке, которые удовлетворяют определенному условию (или всех элементов, если условие отсутствует)
RxJava count / Flow count:
Используются для подсчета количества элементов в потоке, которые удовлетворяют определенному условию (или всех элементов, если условие отсутствует)
Пример кода
//Flow count fun main() = runBlocking { val count = (1..10).asFlow() .count { it % 2 == 0 } // Считаем количество четных чисел println("Flow Count Result: $count") }
//RxJava count fun main() { Observable.fromIterable(1..10) .filter { it % 2 == 0 } // Отфильтруем четные числа .count() // Подсчитываем количество элементов после фильтрации .subscribe { count -> println("RxJava Count Result: $count") } }
Управление временем
RxJava debounce / Flow debounce:
Пропускает элементы, если они генерируются слишком быстро.
Пример кода
//RxJava debounce import io.reactivex.rxjava3.core.Observable import java.util.concurrent.TimeUnit fun main() { Observable.interval(100, TimeUnit.MILLISECONDS) .take(10) .debounce(150, TimeUnit.MILLISECONDS) // Испускает только элементы, //за которыми следует 150мс тишины .subscribe(::println) Thread.sleep(2000) }
//Flow debounce import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking { (1..10).asFlow() .onEach { delay(100L) } .debounce(150L) // Испускает только элементы, //за которыми следует 150мс тишины .collect { println(it) } }
RxJava delay / Flow — другой подход
В RxJava Задерживает эмиссию элементов на указанное время. В Flow используется delay из kotlinx.coroutines.delay внутри построителя flow
или внутри операторов.
Пример кода
//RxJava delay fun main() { val startTime = System.currentTimeMillis() Observable.range(1, 5) .concatMap { item -> Observable.just(item) .delay(1000, TimeUnit.MILLISECONDS) // Задержка каждого элемента } .subscribe { value -> println("RxJava emitted $value at ${System.currentTimeMillis() - startTime} ms") } // Чтобы приложение не завершилось раньше времени Thread.sleep(6000) }
//Flow: delay в onEach fun main() = runBlocking { val startTime = System.currentTimeMillis() (1..5).asFlow() .onEach { delay(1000) } // Задержка перед каждым элементом .collect { value -> println("Flow emitted $value " + "at ${System.currentTimeMillis() - startTime} ms") } }
Надеюсь статья дала Вам более развернутое понимание, как сопоставляются различые операторы в RxJava и Kotlin Flow и поможет более быстро мигрировать с одоного на другое. Так же в статье постарался использовать много временЫх диаграмм для лучшего поимания принципов работы оператора.
Спасибо за внимание!
ссылка на оригинал статьи https://habr.com/ru/articles/856106/
Добавить комментарий