В этой статье мы рассмотрим, как и почему изменилась реализация примитивов синхронизации из стандартной библиотеки Java и пакета java.util.concurrent для Kotlin Coroutines и для языка Kotlin в целом. Сразу хочу предупредить: рассматриваемые в статье библиотеки и классы будут оцениваться не с точки зрения поддержки legacy-функциональности и возможности использовать их в Java, а с точки зрения эффективности и возможности использовать их в корутинах и Kotlin Multiplatform. Поэтому эта статья будет больше полезна тем, кто собирается писать новые проекты на языке Kotlin.
В рамках данной статьи будут рассмотрены:
-
критические секции;
-
атомарные переменные;
-
реактивные переменные;
-
барьерная синхронизация.
Критические секции
Здесь мы не будем очень подробно останавливаться на стандартных реализациях критической секции в языке Java, таких как synchronized и ReentrantLock. Написано множество статей на эту тему, и я уверен, они всем известны. Рассмотрим, почему их использование для синхронизации корутин нежелательно, и приведем альтернативу.
synchronized / @Synchronized
Разработаем простой тест, где будем на разных потоках делать инкремент переменной value:
fun main() = runBlocking { var obj = Any() var value = 0 Array(100_000) { async(Dispatchers.Default) { synchronized(obj) { ++value } } }.forEach { it.await() } assertEquals(100_000, value) }
Данный код запускает 100 000 параллельно выполняющихся корутин (настолько параллельно, насколько это позволяет количество потоков в Dispatchers.Default). В данном примере синхронизация блоком synchronized будет работать корректно, и тест будет пройден. Модифицируем этот код, добавив suspend функцию delay внутрь блока synchronized:
... synchronized(obj) { delay(1) ++value } ...
Данный код даже не скомпилируется с ошибкой:
The ‘delay’ suspension point is inside a critical section
Все дело в том, что мы не можем вызывать никакие suspend функции внутри блока синхронизации, потому что текущий поток корутины должен уметь освобождаться для использования в других корутинах в момент начала активного ожидания. Стандартные реализации критической секции не рассчитаны на то, что в середине блока синхронизации поток может освободиться для других задач, а после ожидания suspend функции выполнение может быть продолжено на совершенно другом потоке.
ReentrantLock
В примере выше нас спас компилятор. Давайте посмотрим, что произойдет при использовании библиотечной реализации критической секции, например, ReentrantLock:
... val lock = ReentrantLock() ... lock.lock() try { delay(1) ++value } finally { lock.unlock() } ...
Такой код скомпилируется, но синхронизация не будет работать, а может, даже произойдет вылет с IllegalMonitorStateException. Что интересно, если в данном примере использовать lock.withLock { ... }, мы получим уже знакомую нам ошибку компиляции.
То же самое касается и полезнейшего класса ReentrantReadWriteLock, который позволяет экономить на операциях read after read. К сожалению, для синхронизации корутин он не будет работать по той же причине.
Mutex
В примерах с synchronized/ReentrantLock и suspend функциями нас мог кое-где спасти компилятор, а в худшем случае мы бы получили несинхронизированный код с вероятным вылетом в рантайме. К счастью для нас, разработчики корутин подумали о таком сценарии и специально для этого разработали интерфейс Mutex и его стандартную имплементацию. Модифицируем код при помощи Mutex:
... val mutex = Mutex() ... mutex.withLock { delay(1) ++value } ...
Такой код отработает корректно: операции над переменной value синхронизированы, suspend функция delay не мешает синхронизации.
К сожалению, класс ReadWriteMutex (аналогичный ReentrantReadWriteLock) по состоянию на июль 2023 года еще не добавили, однако обсуждение давно ведется, и есть уже минимум одна реализация в PullRequest к GitHub Kotlin/kotlinx.coroutines.
Вы можете сказать, что для данного примера синхронизация единственной переменной value при помощи критической секции — не самое концептуально правильное решение. И будете правы, поэтому синхронизацию одиночных переменных рассмотрим в следующем разделе.
Атомарные переменные
Начнем этот раздел со спойлера: volatile (в котлине @Volatile) и классы пакета java.util.concurrent.atomic (например AtomicReference, AtomicInteger…) при правильном применении будут работать с корутинами не хуже, чем со стандартными Java потоками или какими-нибудь многопоточными фреймворками.
volatile / @Volatile
Для начала вспомним, для чего нам вообще нужен @Volatile. Разные потоки для ускорения работы могут кэшировать у себя значения глобальных переменных. Из-за чего в других потоках при обращении к этим переменным мы не всегда будем видеть их актуальное значение. Рассмотрим код, иллюстрирующий суть проблемы:
@Volatile var value1 = true @Volatile var value2 = true fun main() = runBlocking { repeat(1_000) { value1 = true value2 = true val job1 = async(Dispatchers.Default) { value2 = false while(value1); } val job2 = async(Dispatchers.Default) { value1 = false while(value2); } job1.await() job2.await() } println("Success!") }
Это правильный код, который успешно выполнится, но, если убрать аннотацию @Volatile хотя бы с одной переменной, то тест бесконечно зависнет в одном из циклов while, потому что поток корутины использует кэшированное значение общей переменной.
Однако и @Volatile можно использовать неправильно. Рассмотрим другой пример:
@Volatile var value = true fun main() = runBlocking { Array(10_000) { async(Dispatchers.Default) { value = !value } }.forEach { it.await() } assertTrue(value) }
В данном тесте запускается 10 000 конкурирующих корутин, которые меняют значение value на противоположное. Так как количество корутин чётное, итоговое значение не должно измениться и должно остаться равным true. Однако в половине случаев тест падает, даже несмотря на @Volatile. Дело в том, что операция value=!value — неатомарная, то есть состоит из нескольких операций: чтение value, потом запись нового value. Если неатомарные операции часто вызываются в любой многопоточной среде, есть риск получить неправильное значение переменной.
java.util.concurrent.atomic
Большую гибкость по сравнению с @Volatile дают классы пакета java.util.concurrent.atomic, которые позволяют делать атомарными операции инкремента, декремента, compareAndSet (атомарно сравнить с предыдущим значением и записать), getAndSet (атомарное получение старого значения и запись нового) и ряд других. Рассмотрим как можно более эффективно и просто синхронизировать уже знакомый нам код из раздела про критические секции:
fun main() = runBlocking { var value = 0 Array(100_000) { async(Dispatchers.Default) { ++value } }.forEach { it.await() } assertEquals(100_000, value) }
В данном примере кода переменная value не синхронизирована, поэтому значение к концу теста value=100_000 не гарантировано. Так как инкремент по умолчанию — неатомарная операция (++value — это по сути value=value+1 чтение и запись), то и @Volatile тут не поможет. Однако, если модифицировать этот код с помощью класса AtomicInteger:
fun main() = runBlocking { var value = AtomicInteger(0) Array(100_000) { async(Dispatchers.Default) { value.incrementAndGet() } }.forEach { it.await() } assertEquals(100_000, value.get())}
Благодаря атомарности операции incrementAndGet в классе AtomicInteger синхронизация начинает работать правильно. Если задача не ограничивается настолько простыми операциями, и нужно синхронизировать целые участки кода, на помощь приходят уже рассмотренные нами критические секции.
kotlinx.atomicfu
Как я писал в начале раздела, если использовать volatile и классы пакета java.util.concurrent.atomic правильно, то они будет корректно работать и в корутинах. Тогда зачем же нужно что-то еще? — спросите вы.
Дело в том, что этот тезис касается JVM и Android, однако Kotlin и корутины уже давно вышли за пределы этих двух платформ. Если вы уже разрабатываете на Kotlin Multiplaform или потенциально планируете переход, то без kotlinx.atomicfu вам не обойтись:
fun main() = runBlocking { var value = atomic(0) Array(100_000) { async(Dispatchers.Default) { value.incrementAndGet() } }.forEach { it.await() } assertEquals(100_000, value.value) }
В любом случае, переходя на kotlinx.atomicfu для JVM и Android, вы ничего не теряете — все функции библиотеки помечены как actual и внутри используют тот же пакет java.util.concurrent.atomic.
Реактивные переменные
Атомарные переменные — это хорошо, но часто необходимо также подписываться на изменение переменной и выполнять в обработчике какой-то код. В этом разделе мы рассмотрим реализации реактивных переменных, то есть классов, которые позволяют:
-
через generic задать тип значения;
-
записывать значение синхронно;
-
получать значение синхронно;
-
хранить это значение как внутреннее состояние;
-
подписываться на изменения значения.
Реализации, не обладающие всеми этими свойствами, в данном разделе рассмотрены не будут.
LiveData
Первое решение, которое мы рассмотрим, — это LiveData. Уверен, многие Android-разработчики не раз сталкивались с этим классом. LiveData позволяет хранить значение, получать и менять его напрямую через геттер\сеттер, подписываться на изменение значения, привязывать подписку к LifecycleOwner.
Очень полезный класс, долгое время спасавший Android-разработчиков, работает с Android Data Binding и с Jetpack Compose, но не без недостатков. Их мы сейчас и рассмотрим.
Первый недостаток очевиден из того, что я писал выше: класс LiveData — штука специфичная и доступна только для платформы Android, нигде больше использовать его не получится.
Из первого вытекает и второй недостаток: основные методы setValue \ observe должны вызываться только на Main потоке приложения (Dispatchers.Main), иначе будет вылет в рантайме.
val liveData = MutableLiveData(true) withContext(Dispatchers.Default) { // IllegalStateException: Cannot invoke setValue on a background thread liveData.value = false }
Конечно, если использовать LiveData по основному назначению, например, для связи ViewModel + Fragment\Activity, скорее всего, никаких проблем не возникнет. Однако мой опыт говорит о том, что порой разработчики пытаются использовать этот класс еще и на уровне интеракторов\репозиториев и вообще повсюду, где может понадобиться реактивность, что порождает вылеты и замедление Main потока лишними вычислениями. Есть, конечно, метод postValue, который можно вызывать на любом потоке, но не без нюансов:
val liveData = MutableLiveData(true) withContext(Dispatchers.Default) { println(liveData.value) // Prints true liveData.postValue(false) println(liveData.value) // Also prints true }
Значение false запланировано для записи в liveData на Main потоке, но сразу значение false мы не увидим.
Еще один недостаток: LiveData не дает такой гибкости как, например, RxJava с ее методами преобразования вроде map, filter, distinct, debounce и др. Ну это и понятно: LiveData — это не полноценный фреймворк, а класс, созданный для решения конкретных проблем.
Ну и последнее. Значение LiveData всегда может быть null. Даже если в Kotlin явно указать тип значения без вопроса, например, LiveData<Int>, это не мешает нам записать и считать null.
BehaviorSubject
Похожая на LiveData штука — BehaviorSubject из RxJava. Этот класс хранит значение, позволяет его менять, получать и подписываться на него. В то же время он дает возможность пользоваться всей функциональной мощью RxJava и может применяться для JVM, а не только для платформы Android. Еще один неоспоримый плюс: методы getValue и onNext (по сути setValue) синхронизированы и могут свободно использоваться в любой многопоточной среде, а во время подписки можно выбирать поток обработки с помощью метода observeOn.
Из явных минусов можно выделить то, что nullable значения при записи в BehaviorSubject нужно упаковывать в какой-то буферный класс (а потом распаковывать при получении), потому что null не является валидным значением для этого класса.
Так как в этой статье мы рассматриваем в качестве основного фреймворка для многопоточной разработки именно корутины и Coroutines Flow, было бы странно, особенно для новых проектов, добавлять в проект еще и библиотеку RxJava. Поэтому разработчики корутин разработали собственную реализацию реактивных переменных, которую мы рассмотрим в следующем пункте.
StateFlow
Интерфейс StateFlow и его реализация являются частью Coroutines Flow и решают все проблемы LiveData и BehaviorSubject. Класс хранит значение, позволяет его получать, менять, а также подписываться на его изменение, как и в уже рассмотренных выше классах. Однако есть и ряд преимуществ:
-
Значение value (запись и получение) синхронизировано и может использоваться в любой многопоточной среде.
-
Возможность записи\чтения null контролируется на уровне языка через generics. Если объявить StateFlow<Int?>, то value будет nullable, а если StateFlow<Int>, то нет.
-
StateFlow включает в себя все преимущества Coroutines Flow и возможность вызова suspend функций внутри преобразующих методов или прямо внутри collect.
-
При подписке на изменения есть возможность привязки к LifecycleOwner (через lifecycleScope) для Android.
-
Работает вместе с Android Data Binding, Jetpack Compose и даже Compose Multiplatform.
-
StateFlow работает не только для Android и JVM, но еще и для других платформ фреймворка Kotlin Multiplatform.
StateFlow — достаточно мощный и универсальный инструмент, который может использоваться для широкого спектра задач, таких как: связь ViewModel и View в архитектуре MVVM, хранение изменяющихся данных на уровне репозиториев с возможностью подписки на них, преобразование данных через функции Coroutines Flow и даже конструирование примитивов синхронизации, на чем мы подробно остановимся в следующем разделе.
Барьерная синхронизация
В данном разделе будет больше креатива — в корутинах пока еще нет стандартных реализаций барьерной синхронизации, поэтому нам придется самим написать этот код. Обычно под барьерной синхронизацией понимается следующее: несколько разных потоков ждут какого-то события, а когда оно происходит, потоки одновременно выходят из ожидания и продолжают свою работу. В случае с корутинами будет примерно то же самое, но ждать события будут не потоки, а, собственно, корутины. Значит, и метод ожидания будет suspend функцией. Выразим это через интерфейс:
interface Barrier { suspend fun await() @Throws(TimeoutCancellationException::class) suspend fun await(timeout: Duration) }
Очень простой интерфейс всего с двумя функциями: одна ждет с таймаутом, вторая без. Далее в этом разделе мы будем реализовывать недостающие нам классы барьерной синхронизации из пакета java.util.concurrent при помощи этого интерфейса.
CountDownLatch
Класс CountDownLatch из пакета java.util.concurrent действует следующим образом: в одних потоках мы уменьшаем счетчик на 1, а в других потоках ожидаем, пока счетчик не станет равным 0. Напомним функциональность этого класса:
-
задаем счетчик один раз в конструкторе, значение счетчика больше или равно нулю;
-
уменьшение счетчика на единицу (метод countDown);
-
получение значения счетчика синхронно (метод getCount);
-
ожидание обнуления счетчика с таймаутом (метод await).
Достаточно простой и полезный класс, но есть проблема, которая не позволяет эффективно использовать этот класс в корутинах. Проблема — в методе ожидания обнуления счетчика (метод await). Он не является suspend функцией и блокирует поток, а таймаут вызывает InterruptedException. Из-за этого эффективность корутин падает: текущий поток корутины не освобождается для задач в других корутинах. Также страдает и отменяемость: при вызове метода cancel (на текущей корутине или на всем scope целиком) корутина, зависшая в блокирующем методе await, не сможет отмениться, по крайней мере не сразу.
StateFlow — универсальный и мощный класс, который позволяет нам написать собственную реализацию CountDownLatch меньше чем за 50 строчек кода.
class CountDownBarrier(count: UInt) : Barrier { private val stateFlow = MutableStateFlow(count) val counterValue: UInt @Synchronized get() = stateFlow.value @Synchronized fun countDown() { if (stateFlow.value > 0u) { --stateFlow.value } } override suspend fun await() { internalAwait() } @Throws(TimeoutCancellationException::class) override suspend fun await(timeout: Duration) { if (counterValue > 0u) { withTimeout(timeout) { internalAwait() } } } private suspend fun internalAwait() { if (counterValue > 0u) { // Await first value lower than 0 (suspend function). stateFlow.first { it <= 0u } } } }
В данной реализации вся изначальная функциональность CountDownLatch сохранилась, зато метод await теперь является suspend функцией. Использование @Synchronized тут оправдано, потому что должна быть возможность вызывать метод countDown и геттер на counterValue и вне корутин. Кроме того, внутри блоков @Synchronized никаких suspend функций не вызывается. Разработаем тест для класса CountDownBarrier:
fun main() = runBlocking { val сountDownBarrier = CountDownBarrier(5u) val awaitTasks = Array(10) { index -> async(Dispatchers.Default) { сountDownBarrier.await() println("Await finished: index=$index, counter=${сountDownBarrier.counterValue}") } } val countDownTasks = Array(1000) { index -> async(Dispatchers.Default) { println("Count down started: index=$index, counter=${сountDownBarrier.counterValue}") сountDownBarrier.countDown() println("Count down finished: index=$index, counter=${сountDownBarrier.counterValue}") } } awaitTasks.forEach { it.await() } countDownTasks.forEach { it.await() } println("Success: counter=${сountDownBarrier.counterValue}") }
Счетчик CountDownBarrier задан на 5, значит, не раньше чем через 5 вызовов метода countDown мы должны увидеть первые логи Await finished. Посмотрим, что выведет этот код:
Count down started: index=0, counter=5
Count down started: index=0, counter=5
Count down finished: index=0, counter=4
Count down started: index=1, counter=4
Count down finished: index=1, counter=3
Count down started: index=2, counter=3
Count down finished: index=2, counter=2
Count down started: index=3, counter=2
Count down finished: index=3, counter=1
Count down started: index=4, counter=1
Count down finished: index=4, counter=0
Await finished: index=9, counter=0
Await finished: index=8, counter=0
…
Счетчик обнулился, и только после этого ожидающие корутины среагировали на это и вышли из await, значит, класс CountDownBarrier работает корректно.
CyclicBarrier
Класс CyclicBarrier похож на CountDownLatch, но значение внутреннего счетчика меняется не публичным методом countDown, а в зависимости от количества ожидающих потоков. Когда количество ожидающих потоков становится равно заданному значению, все они одновременно выходят из ожидания. После освобождения всех потоков внутренний счетчик возвращается в начальное значение. Реализуем примитив барьера для корутин при помощи StateFlow:
class CoroutinesBarrier( val initialCoroutinesCount: UShort ) : Barrier { // @Volatile is not necessary here: this field is only used in @Synchronized blocks. private var stateFlow = MutableStateFlow(initialCoroutinesCount) val countLeftToReleaseBarrier: UShort @Synchronized get() = stateFlow.value override suspend fun await() { internalAwait() } @Throws(TimeoutCancellationException::class) override suspend fun await(timeout: Duration) { withTimeout(timeout) { internalAwait() } } private suspend fun internalAwait() { val (flowToAwait, countLeftToRelease) = countDownOrResetBarrier() if (countLeftToRelease > 0u) { // Await first value lower than 0 (suspend function). flowToAwait.first { it <= 0u } } } @Synchronized private fun countDownOrResetBarrier(): Pair<Flow<UShort>, UShort> { if (stateFlow.value > 0u) { --stateFlow.value } val result = stateFlow to stateFlow.value // Reset flow right before releasing awaiting coroutines. if (stateFlow.value <= 0u) { stateFlow = MutableStateFlow(initialCoroutinesCount) } return result } }
В отличие от CyclicBarrier в конструкторе мы задаем не количество потоков, а количество корутин, при котором барьер освобождается. Чтобы можно было сбрасывать значение счетчика в начальное значение, stateFlow объявлена как var. При сбросе счетчика мы записываем в нее новый экземпляр MutableStateFlow. Дополнительная синхронизация (например, @Volatile) на переменную stateFlow не нужна, так как все обращения к этой переменной происходят только внутри блоков @Synchronized. Разработаем тест для класса CoroutinesBarrier:
fun main() = runBlocking { val startTimeMillis = System.currentTimeMillis() val coroutinesBarrier = CoroutinesBarrier(3u) val awaitTasks = Array(6) { index -> async(Dispatchers.Default) { println("Coroutine started.") delay(index * 100L) coroutinesBarrier.await(5_000.milliseconds) val durationMillis = System.currentTimeMillis() - startTimeMillis println("Coroutine released: duration=$durationMillis.") } }.forEach { it.await() } val durationMillis = System.currentTimeMillis() - startTimeMillis println("Test finished: duration=$durationMillis.") }
В этом тесте мы запускаем шесть корутин с увеличивающейся задержкой (100, 200 … 600 миллисекунд). Сразу после задержки выставлен барьер на три корутины. Посмотрим, что выведет тест:
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=583.
Coroutine released: duration=583.
Coroutine released: duration=583.
Test finished: duration=583.
Из результатов видно, что барьер работает. Корутины выходили из барьера по три штуки: первые три — спустя 285 миллисекунд, вторые три — спустя 583 миллисекунды. Однако, если в этом тесте увеличить количество корутин с шести до семи, то вместо вывода Test finished программа вылетит с ошибкой спустя пять секунд:
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 5000 ms
Это логично, так как семь на три не делится нацело, значит две пачки корутин по три штуки пройдут барьер, а одна последняя корутина будет ждать до таймаута.
Заключение
В данной статье мы рассмотрели, как правильно применять прототипы синхронизации потоков в контексте корутин, выделили плюсы и минусы существующих решений, а также написали собственную реализацию примитивов барьерной синхронизации для корутин. В следующей статье мы разберем реактивные потоки, раздельный доступ, семафоры и акторы.
ссылка на оригинал статьи https://habr.com/ru/companies/garage8/articles/747948/
Добавить комментарий