Сравнение операторов RxJava 3 и Kotlin Coroutines Flow

от автора

Привет, Хабр! Меня зовут Константинов Александр, я Android-разработчик в «Студии Олега Чулакова». Сегодня мы сравим операторы RxJava 3 и Flow. Статья будет полезна как для изучения операторов, так и для более легкого перехода с RxJava на Flow. Буду рад вашему фидбеку и комметариям.

Ну что ж, давайте начинать!

В этой статье рассмотрим только самые популярые операторы и примеры кода:

Операторы преобразования

RxJava map / Flow map:

Преобразует элементы, применяя функцию к каждому из них. Работает последовательно.

Flow map

Flow map
Пример кода
//RxJava map fun main() {   Observable.range(1, 5)     .map { it * 2 }     .subscribe(::println) // Вывод: 2, 4, 6, 8, 10 }
//Flow map fun main() = runBlocking {   (1..5).asFlow()     .map { it * 2 }     .collect { println(it) } // Вывод: 2, 4, 6, 8, 10 }

RxJava flatMap / Flow flatMapMerge:

Преобразует каждое значение в поток. RxJava flatMap / Flow flatMapMerge , в отличие от RxJava concatMap / Flow flatMapConcat , обрабатывает элементы исходного потока параллельно, а не последовательно. Потоки для каждого элемента запускаются одновременно и эмитят значения в порядке их готовности, а не поочередно, поэтому порядок значений может отличаться и не соответствовать исходному. Следует использовать, когда порядок обработки не критичен и можно выиграть в производительности от параллельной обработки.

Flow flatMapMerge

Flow flatMapMerge
Пример кода
//RxJava flatMap fun main() {     Observable.just(1, 2, 3)         .flatMap { i ->             Observable.just(i * 10)                 .concatWith(Observable.just(i * 20).delay(10, TimeUnit.MILLISECONDS))         }         .blockingSubscribe(::println) }
//Flow flatMapMerge fun main() = runBlocking {   flowOf(1, 2, 3)     .flatMapMerge { flow { emit(it * 10); delay(10); emit(it * 20) } }     .collect { println(it) } }

RxJava concatMap / Flow flatMapConcat:

Преобразует каждое значение в поток и объединяет их последовательно. Берет каждый элемент из исходного потока и, для каждого элемента, создает новый поток. Эти потоки затем объединяются в один линейный поток. Важно отметить, что объединение происходит поочередно: следующий поток начинается только после того, как завершен предыдущий.

Flow flatMapConcat

Flow flatMapConcat
Пример кода
//RxJava concatMap fun main() {   Observable.just(1, 2, 3)     .concatMap { i -> Observable.just(i * 10, i * 20) }     .subscribe(::println) // Вывод: 10, 20, 20, 40, 30, 60 }
//Flow flatMapConcat fun main() = runBlocking {   flowOf(1, 2, 3)     .flatMapConcat { flowOf(it * 10, it * 20) }     .collect { println(it) } // Вывод: 10, 20, 20, 40, 30, 60 }

RxJava buffer / Flow buffer:

Группирует элементы в списки фиксированного размера.

Используется для обработки элементов в буферах (или пакетами), что позволяет повысить производительность, особенно в случаях, когда обработка каждого элемента по отдельности блокирует лишние ресурсы.

Пример кода
//RxJava buffer fun main() {   Observable.just(1, 2, 3, 4, 5)     .doOnNext { println("Emitting $it") }     .buffer(2) // Буферизуем элементы по 2     .subscribe { buffer ->       println("Collected $buffer")     }   Thread.sleep(1000) }
//Flow buffer fun main() = runBlocking {   flowOf(1, 2, 3, 4, 5)     .onEach { println("Emitting $it") }     .buffer() // Буферизуем все элементы     .collect { value ->       println("Collected $value")     } }

RxJava scan / Flow scan:

Последовательно применяет функцию к элементам, возвращая промежуточные результаты. Используется для накопления значений с возможностью учета предыдущих накопленных значений.

Flow scan

Flow scan
Пример кода
//RxJava scan fun main() {     Observable.fromArray(1, 2, 3)         .scan(0) { accumulator, value ->             accumulator + value         }         .subscribe { result ->             println(result) // Вывод: 0, 1, 3, 6         } }
//Flow scan fun main() = runBlocking<Unit> {     // Создаем Flow от 1 до 3     val flow = (1..3).asFlow()      // Применяем оператор scan     flow.scan(0) { accumulator, value ->         accumulator + value     }.collect { result ->         println(result) // Вывод: 0, 1, 3, 6     } }

RxJava groupBy / Аналог на Flow отсутствует:

Разбивает поток на несколько потоков, сгруппированных по ключам.

Пример кода
fun main() {   val observable = Observable.just(     "one", "two", "three", "four", "five", "six"   )    observable.groupBy { it.length }     .flatMapSingle { group ->       group.toList().map { list -> Pair(group.key, list) }     }     .subscribe { pair ->       println("Length: ${pair.first}, Words: ${pair.second}")     } }

Фильтрация

RxJava distinctUntilChanged / Flow distinctUntilChanged

Исключает повторяющиеся последовательные элементы.

Flow distinctUntilChanged

Flow distinctUntilChanged
Пример кода
//RxJava distinctUntilChanged fun main() {   Observable.just(1, 1, 2, 2, 3, 1)     .distinctUntilChanged()     .subscribe(::println) // Вывод: 1, 2, 3, 1 }
//Flow distinctUntilChanged fun main() = runBlocking {   flowOf(1, 1, 2, 2, 3, 1)     .distinctUntilChanged()     .collect { println(it) } // Вывод: 1, 2, 3, 1 }

RxJava filter / Flow filter:

Отбирает элементы, соответствующие условию.

Flow filter

Flow filter
Пример кода
//RxJava filter fun main() {   Observable.range(1, 5)     .filter { it % 2 == 0 }     .subscribe(::println) // Вывод: 2, 4 }
//Flow filter fun main() = runBlocking {   (1..5).asFlow()     .filter { it % 2 == 0 }     .collect { println(it) } // Вывод: 2, 4 }

RxJava take / Flow take:

Берет первые N элементов из входящего потока.

Flow take

Flow take
Пример кода
//RxJava take fun main() {   Observable.range(1, 10)     .take(3)     .subscribe(::println) // Вывод: 1, 2, 3 }
//Flow take fun main() = runBlocking {   (1..10).asFlow()     .take(3)     .collect { println(it) } // Вывод: 1, 2, 3 }

RxJava skip / Flow drop:

Пропускает первые N элементов.

Flow drop

Flow drop
Пример кода
//Flow drop fun main() = runBlocking {   flowOf(1, 2, 3, 4, 5)     .drop(2) // Пропускаем первые 2 элемента     .collect { value ->       println(value) // Выведет 3, 4, 5     } }
//RxJava skip fun main() {   Observable.just(1, 2, 3, 4, 5)     .skip(2) // Пропускаем первые 2 элемента     .subscribe { value ->       println(value) // Выведет 3, 4, 5     } }

Комбинирование

RxJava merge / Flow merge:

Объединяет заданные потоки в один поток. Все потоки объединяются одновременно, без ограничения на количество одновременно собираемых потоков.

Flow merge

Flow merge
Пример кода
//Flow merge fun main() = runBlocking {   val flow1 = flow {     repeat(3) {        emit("From flow1: $it")       delay(500L) // Эмиссия каждые 500 мс     }   }    val flow2 = flow {     repeat(3) {        emit("From flow2: $it")       delay(1000L) // Эмиссия каждые 1000 мс     }   }    val startTime = System.currentTimeMillis()   merge(flow1, flow2).collect { value ->     println("$value at ${System.currentTimeMillis() - startTime} ms")   } }
//RxJava merge fun main() {     val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)         .take(3)         .map { "From observable1: $it" }      val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)         .take(3)         .map { "From observable2: $it" }      val startTime = System.currentTimeMillis()     Observable.merge(observable1, observable2)         .subscribe { value ->             println("$value at ${System.currentTimeMillis() - startTime} ms")         }      // Ожидание завершения потоков     Thread.sleep(4000) }

RxJava zip / Flow zip:

Объединяет потоки, применяя функцию комбинирования к элементам. Элемент для которого нет пары не испускается.

Flow zip

Flow zip
Пример кода
//RxJava zip fun main() {   val nums = Observable.range(1, 3)   val letters = Observable.just("A", "B", "C", "D")      Observable.zip(nums, letters) { n, l -> "$n -> $l" }     .subscribe(::println)    // Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится }
//Fllow zip fun main() = runBlocking {   val numbers = (1..3).asFlow()   val letters = flowOf("A", "B", "C", "D")   numbers.zip(letters) { n, l -> "$n -> $l" }     .collect { println(it) }    // Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится }

RxJava combineLatest / Flow combine:

Объединяет потоки, используя последние элементы из каждого.

Flow combine

Flow combine
Пример кода
//Flow combine fun main() = runBlocking {     val flow1 = flow {         repeat(3) {             emit("String: s$it")             delay(500L) // Эмиссия каждые 500 мс         }     }      val flow2 = flow {         repeat(3) {             emit(it)             delay(1000L) // Эмиссия каждые 1000 мс         }     }      flow1.combine(flow2) { str, num ->         "$str and Int: $num"     }.collect { value ->         println(value)     } }
//RxJava combineLatest fun main() {     val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)         .take(3)         .map { "String: s$it" }      val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)         .take(3)      Observable.combineLatest(observable1, observable2) { str: String, num: Long ->         "$str and Int: $num"     }         .subscribe { value ->             println(value)         }     Thread.sleep(5000L) }

Агрегация

RxJava reduce / Flow reduce:

В Kotlin Flow оператор reduce аналогичен одноименному оператору в RxJava. Оба используются для преобразования коллекции значений в одиночное значение, используя начальное значение и функцию агрегирования (аккумулятора). Применяет функцию агрегирования ко всем элементам, возвращает одно значение.

Пример кода
//Flow reduce fun main() = runBlocking {   val result = (1..5).asFlow()     .reduce { accumulator, value ->       accumulator + value     }    println("Flow Reduce Result: $result") }
//RxJava reduce fun main() {     Observable.fromIterable(listOf(1, 2, 3, 4, 5))         .reduce { accumulator, value ->             accumulator + value         }         .subscribe { result ->             println("RxJava Reduce Result: $result")         } }
  • count:

    Используются для подсчета количества элементов в потоке, которые удовлетворяют определенному условию (или всех элементов, если условие отсутствует)

RxJava count / Flow count:

Используются для подсчета количества элементов в потоке, которые удовлетворяют определенному условию (или всех элементов, если условие отсутствует)

Пример кода
//Flow count fun main() = runBlocking {     val count = (1..10).asFlow()         .count { it % 2 == 0 } // Считаем количество четных чисел      println("Flow Count Result: $count") }
//RxJava count fun main() {     Observable.fromIterable(1..10)         .filter { it % 2 == 0 } // Отфильтруем четные числа         .count() // Подсчитываем количество элементов после фильтрации         .subscribe { count ->             println("RxJava Count Result: $count")         } }

Управление временем

RxJava debounce / Flow debounce:

Пропускает элементы, если они генерируются слишком быстро.

Flow debounce

Flow debounce
Пример кода
//RxJava debounce import io.reactivex.rxjava3.core.Observable import java.util.concurrent.TimeUnit  fun main() {   Observable.interval(100, TimeUnit.MILLISECONDS)     .take(10)     .debounce(150, TimeUnit.MILLISECONDS) // Испускает только элементы,    //за которыми следует 150мс тишины     .subscribe(::println)    Thread.sleep(2000) }
//Flow debounce import kotlinx.coroutines.* import kotlinx.coroutines.flow.*  fun main() = runBlocking {   (1..10).asFlow()     .onEach { delay(100L) }     .debounce(150L) // Испускает только элементы,    //за которыми следует 150мс тишины     .collect { println(it) }  }

RxJava delay / Flow — другой подход

В RxJava Задерживает эмиссию элементов на указанное время. В Flow используется delay из kotlinx.coroutines.delay внутри построителя flow или внутри операторов.

Пример кода
//RxJava delay fun main() {   val startTime = System.currentTimeMillis()    Observable.range(1, 5)     .concatMap { item ->       Observable.just(item)         .delay(1000, TimeUnit.MILLISECONDS) // Задержка каждого элемента     }     .subscribe { value ->       println("RxJava emitted $value at ${System.currentTimeMillis() - startTime} ms")     }    // Чтобы приложение не завершилось раньше времени   Thread.sleep(6000) }
//Flow: delay в onEach fun main() = runBlocking {     val startTime = System.currentTimeMillis()      (1..5).asFlow()         .onEach { delay(1000) } // Задержка перед каждым элементом         .collect { value ->             println("Flow emitted $value " +                     "at ${System.currentTimeMillis() - startTime} ms")         } }

Надеюсь статья дала Вам более развернутое понимание, как сопоставляются различые операторы в RxJava и Kotlin Flow и поможет более быстро мигрировать с одоного на другое. Так же в статье постарался использовать много временЫх диаграмм для лучшего поимания принципов работы оператора.

Спасибо за внимание!


ссылка на оригинал статьи https://habr.com/ru/articles/856106/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *