От потоков к корутинам: как и почему видоизменились примитивы синхронизации в языке Kotlin (Часть 1)

от автора

В этой статье мы рассмотрим, как и почему изменилась реализация примитивов синхронизации из стандартной библиотеки Java и пакета java.util.concurrent для Kotlin Coroutines и для языка Kotlin в целом. Сразу хочу предупредить: рассматриваемые в статье библиотеки и классы будут оцениваться не с точки зрения поддержки legacy-функциональности и возможности использовать их в Java, а с точки зрения эффективности и возможности использовать их в корутинах и Kotlin Multiplatform. Поэтому эта статья будет больше полезна тем, кто собирается писать новые проекты на языке Kotlin.

В рамках данной статьи будут рассмотрены:

  • критические секции;

  • атомарные переменные;

  • реактивные переменные;

  • барьерная синхронизация.

Критические секции

Здесь мы не будем очень подробно останавливаться на стандартных реализациях критической секции в языке Java, таких как synchronized и ReentrantLock. Написано множество статей на эту тему, и я уверен, они всем известны. Рассмотрим, почему их использование для синхронизации корутин нежелательно, и приведем альтернативу.

synchronized / @Synchronized

Разработаем простой тест, где будем на разных потоках делать инкремент переменной value:

fun main() = runBlocking {     var obj = Any()     var value = 0      Array(100_000) {         async(Dispatchers.Default) {             synchronized(obj) {                 ++value             }         }     }.forEach { it.await() }      assertEquals(100_000, value) }

Данный код запускает 100 000 параллельно выполняющихся корутин (настолько параллельно, насколько это позволяет количество потоков в Dispatchers.Default). В данном примере синхронизация блоком synchronized будет работать корректно, и тест будет пройден. Модифицируем этот код, добавив suspend функцию delay внутрь блока synchronized:

... synchronized(obj) {     delay(1)     ++value } ...

Данный код даже не скомпилируется с ошибкой:

The ‘delay’ suspension point is inside a critical section

Все дело в том, что мы не можем вызывать никакие suspend функции внутри блока синхронизации, потому что текущий поток корутины должен уметь освобождаться для использования в других корутинах в момент начала активного ожидания. Стандартные реализации критической секции не рассчитаны на то, что в середине блока синхронизации поток может освободиться для других задач, а после ожидания suspend функции выполнение может быть продолжено на совершенно другом потоке.

ReentrantLock

В примере выше нас спас компилятор. Давайте посмотрим, что произойдет при использовании библиотечной реализации критической секции, например, ReentrantLock:

... val lock = ReentrantLock() ... lock.lock() try {         delay(1)     ++value } finally {     lock.unlock() } ...

Такой код скомпилируется, но синхронизация не будет работать, а может, даже произойдет вылет с IllegalMonitorStateException. Что интересно, если в данном примере использовать lock.withLock { ... }, мы получим уже знакомую нам ошибку компиляции.

То же самое касается и полезнейшего класса ReentrantReadWriteLock, который позволяет экономить на операциях read after read. К сожалению, для синхронизации корутин он не будет работать по той же причине.

Mutex

В примерах с synchronized/ReentrantLock и suspend функциями нас мог кое-где спасти компилятор, а в худшем случае мы бы получили несинхронизированный код с вероятным вылетом в рантайме. К счастью для нас, разработчики корутин подумали о таком сценарии и специально для этого разработали интерфейс Mutex и его стандартную имплементацию. Модифицируем код при помощи Mutex:

... val mutex = Mutex() ... mutex.withLock {     delay(1)     ++value } ...

Такой код отработает корректно: операции над переменной value синхронизированы, suspend функция delay не мешает синхронизации.

К сожалению, класс ReadWriteMutex (аналогичный ReentrantReadWriteLock) по состоянию на июль 2023 года еще не добавили, однако обсуждение давно ведется, и есть уже минимум одна реализация в PullRequest к GitHub Kotlin/kotlinx.coroutines.

Вы можете сказать, что для данного примера синхронизация единственной переменной value при помощи критической секции — не самое концептуально правильное решение. И будете правы, поэтому синхронизацию одиночных переменных рассмотрим в следующем разделе.

Атомарные переменные

Начнем этот раздел со спойлера: volatile (в котлине @Volatile) и классы пакета java.util.concurrent.atomic (например AtomicReference, AtomicInteger…) при правильном применении будут работать с корутинами не хуже, чем со стандартными Java потоками или какими-нибудь многопоточными фреймворками.

volatile / @Volatile

Для начала вспомним, для чего нам вообще нужен @Volatile. Разные потоки для ускорения работы могут кэшировать у себя значения глобальных переменных. Из-за чего в других потоках при обращении к этим переменным мы не всегда будем видеть их актуальное значение. Рассмотрим код, иллюстрирующий суть проблемы:

@Volatile var value1 = true @Volatile var value2 = true  fun main() = runBlocking {     repeat(1_000) {         value1 = true         value2 = true          val job1 = async(Dispatchers.Default) {             value2 = false             while(value1);         }          val job2 = async(Dispatchers.Default) {             value1 = false             while(value2);         }          job1.await()         job2.await()     }      println("Success!") }

Это правильный код, который успешно выполнится, но, если убрать аннотацию @Volatile хотя бы с одной переменной, то тест бесконечно зависнет в одном из циклов while, потому что поток корутины использует кэшированное значение общей переменной.

Однако и @Volatile можно использовать неправильно. Рассмотрим другой пример:

@Volatile var value = true  fun main() = runBlocking {     Array(10_000) {         async(Dispatchers.Default) {             value = !value         }     }.forEach { it.await() }      assertTrue(value) }

В данном тесте запускается 10 000 конкурирующих корутин, которые меняют значение value на противоположное. Так как количество корутин чётное, итоговое значение не должно измениться и должно остаться равным true. Однако в половине случаев тест падает, даже несмотря на @Volatile. Дело в том, что операция value=!value — неатомарная, то есть состоит из нескольких операций: чтение value, потом запись нового value. Если неатомарные операции часто вызываются в любой многопоточной среде, есть риск получить неправильное значение переменной.

java.util.concurrent.atomic

Большую гибкость по сравнению с @Volatile дают классы пакета java.util.concurrent.atomic, которые позволяют делать атомарными операции инкремента, декремента, compareAndSet (атомарно сравнить с предыдущим значением и записать), getAndSet (атомарное получение старого значения и запись нового) и ряд других. Рассмотрим как можно более эффективно и просто синхронизировать уже знакомый нам код из раздела про критические секции:

fun main() = runBlocking {     var value = 0      Array(100_000) {         async(Dispatchers.Default) {             ++value         }     }.forEach { it.await() }      assertEquals(100_000, value) }

В данном примере кода переменная value не синхронизирована, поэтому значение к концу теста value=100_000 не гарантировано. Так как инкремент по умолчанию — неатомарная операция (++value — это по сути value=value+1 чтение и запись), то и @Volatile тут не поможет. Однако, если модифицировать этот код с помощью класса AtomicInteger:

fun main() = runBlocking {     var value = AtomicInteger(0)      Array(100_000) {         async(Dispatchers.Default) {             value.incrementAndGet()         }     }.forEach { it.await() }      assertEquals(100_000, value.get())}

Благодаря атомарности операции incrementAndGet в классе AtomicInteger синхронизация начинает работать правильно. Если задача не ограничивается настолько простыми операциями, и нужно синхронизировать целые участки кода, на помощь приходят уже рассмотренные нами критические секции.

kotlinx.atomicfu

Как я писал в начале раздела, если использовать volatile и классы пакета java.util.concurrent.atomic правильно, то они будет корректно работать и в корутинах. Тогда зачем же нужно что-то еще? — спросите вы. 

Дело в том, что этот тезис касается JVM и Android, однако Kotlin и корутины уже давно вышли за пределы этих двух платформ. Если вы уже разрабатываете на Kotlin Multiplaform или потенциально планируете переход, то без kotlinx.atomicfu вам не обойтись:

fun main() = runBlocking {     var value = atomic(0)      Array(100_000) {         async(Dispatchers.Default) {             value.incrementAndGet()         }     }.forEach { it.await() }      assertEquals(100_000, value.value) }

В любом случае, переходя на kotlinx.atomicfu для JVM и Android, вы ничего не теряете — все функции библиотеки помечены как actual и внутри используют тот же пакет java.util.concurrent.atomic.

Реактивные переменные

Атомарные переменные — это хорошо, но часто необходимо также подписываться на изменение переменной и выполнять в обработчике какой-то код. В этом разделе мы рассмотрим реализации реактивных переменных, то есть классов, которые позволяют:

  • через generic задать тип значения;

  • записывать значение синхронно;

  • получать значение синхронно;

  • хранить это значение как внутреннее состояние;

  • подписываться на изменения значения.

Реализации, не обладающие всеми этими свойствами, в данном разделе рассмотрены не будут.

LiveData

Первое решение, которое мы рассмотрим, — это LiveData. Уверен, многие Android-разработчики не раз сталкивались с этим классом. LiveData позволяет хранить значение, получать и менять его напрямую через геттер\сеттер, подписываться на изменение значения, привязывать подписку к LifecycleOwner. 

Очень полезный класс, долгое время спасавший Android-разработчиков, работает с Android Data Binding и с Jetpack Compose, но не без недостатков. Их мы сейчас и рассмотрим.

Первый недостаток очевиден из того, что я писал выше: класс LiveData — штука специфичная и доступна только для платформы Android, нигде больше использовать его не получится.

Из первого вытекает и второй недостаток: основные методы setValue \ observe должны вызываться только на Main потоке приложения (Dispatchers.Main), иначе будет вылет в рантайме.

val liveData = MutableLiveData(true)  withContext(Dispatchers.Default) {     // IllegalStateException: Cannot invoke setValue on a background thread     liveData.value = false }

Конечно, если использовать LiveData по основному назначению, например, для связи ViewModel + Fragment\Activity, скорее всего, никаких проблем не возникнет. Однако мой опыт говорит о том, что порой разработчики пытаются использовать этот класс еще и на уровне интеракторов\репозиториев и вообще повсюду, где может понадобиться реактивность, что порождает вылеты и замедление Main потока лишними вычислениями. Есть, конечно, метод postValue, который можно вызывать на любом потоке, но не без нюансов:

val liveData = MutableLiveData(true)  withContext(Dispatchers.Default) {     println(liveData.value) // Prints true     liveData.postValue(false)     println(liveData.value) // Also prints true }

Значение false запланировано для записи в liveData на Main потоке, но сразу значение false мы не увидим.

Еще один недостаток: LiveData не дает такой гибкости как, например, RxJava с ее методами преобразования вроде map, filter, distinct, debounce и др. Ну это и понятно: LiveData — это не полноценный фреймворк, а класс, созданный для решения конкретных проблем.

Ну и последнее. Значение LiveData всегда может быть null. Даже если в Kotlin явно указать тип значения без вопроса, например, LiveData<Int>, это не мешает нам записать и считать null.

BehaviorSubject

Похожая на LiveData штука — BehaviorSubject из RxJava. Этот класс хранит значение, позволяет его менять, получать и подписываться на него. В то же время он дает возможность пользоваться всей функциональной мощью RxJava и может применяться для JVM, а не только для платформы Android. Еще один неоспоримый плюс: методы getValue и onNext (по сути setValue) синхронизированы и могут свободно использоваться в любой многопоточной среде, а во время подписки можно выбирать поток обработки с помощью метода observeOn.

Из явных минусов можно выделить то, что nullable значения при записи в BehaviorSubject нужно упаковывать в какой-то буферный класс (а потом распаковывать при получении), потому что null не является валидным значением для этого класса.

Так как в этой статье мы рассматриваем в качестве основного фреймворка для многопоточной разработки именно корутины и Coroutines Flow, было бы странно, особенно для новых проектов, добавлять в проект еще и библиотеку RxJava. Поэтому разработчики корутин разработали собственную реализацию реактивных переменных, которую мы рассмотрим в следующем пункте.

StateFlow

Интерфейс StateFlow и его реализация являются частью Coroutines Flow и решают все проблемы LiveData и BehaviorSubject. Класс хранит значение, позволяет его получать, менять, а также подписываться на его изменение, как и в уже рассмотренных выше классах. Однако есть и ряд преимуществ:

  • Значение value (запись и получение) синхронизировано и может использоваться в любой многопоточной среде.

  • Возможность записи\чтения null контролируется на уровне языка через generics. Если объявить StateFlow<Int?>, то value будет nullable, а если StateFlow<Int>, то нет.

  • StateFlow включает в себя все преимущества Coroutines Flow и возможность вызова suspend функций внутри преобразующих методов или прямо внутри collect.

  • При подписке на изменения есть возможность привязки к LifecycleOwner (через lifecycleScope) для Android.

  • Работает вместе с Android Data Binding, Jetpack Compose и даже Compose Multiplatform.

  • StateFlow работает не только для Android и JVM, но еще и для других платформ фреймворка Kotlin Multiplatform.

StateFlow — достаточно мощный и универсальный инструмент, который может использоваться для широкого спектра задач, таких как: связь ViewModel и View в архитектуре MVVM, хранение изменяющихся данных на уровне репозиториев с возможностью подписки на них, преобразование данных через функции Coroutines Flow и даже конструирование примитивов синхронизации, на чем мы подробно остановимся в следующем разделе.

Барьерная синхронизация

В данном разделе будет больше креатива — в корутинах пока еще нет стандартных реализаций барьерной синхронизации, поэтому нам придется самим написать этот код. Обычно под барьерной синхронизацией понимается следующее: несколько разных потоков ждут какого-то события, а когда оно происходит, потоки одновременно выходят из ожидания и продолжают свою работу. В случае с корутинами будет примерно то же самое, но ждать события будут не потоки, а, собственно, корутины. Значит, и метод ожидания будет suspend функцией. Выразим это через интерфейс:

interface Barrier {     suspend fun await()      @Throws(TimeoutCancellationException::class)     suspend fun await(timeout: Duration) }

Очень простой интерфейс всего с двумя функциями: одна ждет с таймаутом, вторая без. Далее в этом разделе мы будем реализовывать недостающие нам классы барьерной синхронизации из пакета java.util.concurrent при помощи этого интерфейса.

CountDownLatch

Класс CountDownLatch из пакета java.util.concurrent действует следующим образом: в одних потоках мы уменьшаем счетчик на 1, а в других потоках ожидаем, пока счетчик не станет равным 0. Напомним функциональность этого класса:

  • задаем счетчик один раз в конструкторе, значение счетчика больше или равно нулю;

  • уменьшение счетчика на единицу (метод countDown);

  • получение значения счетчика синхронно (метод getCount);

  • ожидание обнуления счетчика с таймаутом (метод await).

Достаточно простой и полезный класс, но есть проблема, которая не позволяет эффективно использовать этот класс в корутинах. Проблема — в методе ожидания обнуления счетчика (метод await). Он не является suspend функцией и блокирует поток, а таймаут вызывает InterruptedException. Из-за этого эффективность корутин падает: текущий поток корутины не освобождается для задач в других корутинах. Также страдает и отменяемость: при вызове метода cancel (на текущей корутине или на всем scope целиком) корутина, зависшая в блокирующем методе await, не сможет отмениться, по крайней мере не сразу.

StateFlow — универсальный и мощный класс, который позволяет нам написать собственную реализацию CountDownLatch меньше чем за 50 строчек кода.

class CountDownBarrier(count: UInt) : Barrier {     private val stateFlow = MutableStateFlow(count)      val counterValue: UInt         @Synchronized get() = stateFlow.value      @Synchronized     fun countDown() {         if (stateFlow.value > 0u) {             --stateFlow.value         }     }      override suspend fun await() {         internalAwait()     }      @Throws(TimeoutCancellationException::class)     override suspend fun await(timeout: Duration) {         if (counterValue > 0u) {             withTimeout(timeout) { internalAwait() }         }     }      private suspend fun internalAwait() {         if (counterValue > 0u) {             // Await first value lower than 0 (suspend function).             stateFlow.first { it <= 0u }         }     } }

В данной реализации вся изначальная функциональность CountDownLatch сохранилась, зато метод await теперь является suspend функцией. Использование @Synchronized тут оправдано, потому что должна быть возможность вызывать метод countDown и геттер на counterValue и вне корутин. Кроме того, внутри блоков @Synchronized никаких suspend функций не вызывается. Разработаем тест для класса CountDownBarrier:

fun main() = runBlocking {     val сountDownBarrier = CountDownBarrier(5u)      val awaitTasks = Array(10) { index ->         async(Dispatchers.Default) {             сountDownBarrier.await()             println("Await finished: index=$index, counter=${сountDownBarrier.counterValue}")         }     }      val countDownTasks = Array(1000) { index ->         async(Dispatchers.Default) {             println("Count down started: index=$index, counter=${сountDownBarrier.counterValue}")             сountDownBarrier.countDown()             println("Count down finished: index=$index, counter=${сountDownBarrier.counterValue}")         }     }      awaitTasks.forEach { it.await() }     countDownTasks.forEach { it.await() }      println("Success: counter=${сountDownBarrier.counterValue}") }

Счетчик CountDownBarrier задан на 5, значит, не раньше чем через 5 вызовов метода countDown мы должны увидеть первые логи Await finished. Посмотрим, что выведет этот код:

Count down started: index=0, counter=5
Count down started: index=0, counter=5
Count down finished: index=0, counter=4
Count down started: index=1, counter=4
Count down finished: index=1, counter=3
Count down started: index=2, counter=3
Count down finished: index=2, counter=2
Count down started: index=3, counter=2
Count down finished: index=3, counter=1
Count down started: index=4, counter=1
Count down finished: index=4, counter=0
Await finished: index=9, counter=0
Await finished: index=8, counter=0

Счетчик обнулился, и только после этого ожидающие корутины среагировали на это и вышли из await, значит, класс CountDownBarrier работает корректно.

CyclicBarrier

Класс CyclicBarrier похож на CountDownLatch, но значение внутреннего счетчика меняется не публичным методом countDown, а в зависимости от количества ожидающих потоков. Когда количество ожидающих потоков становится равно заданному значению, все они одновременно выходят из ожидания. После освобождения всех потоков внутренний счетчик возвращается в начальное значение. Реализуем примитив барьера для корутин при помощи StateFlow:

class CoroutinesBarrier( val initialCoroutinesCount: UShort ) : Barrier {      // @Volatile is not necessary here: this field is only used in @Synchronized blocks.     private var stateFlow = MutableStateFlow(initialCoroutinesCount)      val countLeftToReleaseBarrier: UShort         @Synchronized get() = stateFlow.value      override suspend fun await() {         internalAwait()     }      @Throws(TimeoutCancellationException::class)     override suspend fun await(timeout: Duration) {         withTimeout(timeout) { internalAwait() }     }      private suspend fun internalAwait() {         val (flowToAwait, countLeftToRelease) = countDownOrResetBarrier()          if (countLeftToRelease > 0u) {             // Await first value lower than 0 (suspend function).             flowToAwait.first { it <= 0u }         }     }      @Synchronized     private fun countDownOrResetBarrier(): Pair<Flow<UShort>, UShort> {         if (stateFlow.value > 0u) {             --stateFlow.value         }          val result = stateFlow to stateFlow.value          // Reset flow right before releasing awaiting coroutines.         if (stateFlow.value <= 0u) {             stateFlow = MutableStateFlow(initialCoroutinesCount)         }          return result     } }

В отличие от CyclicBarrier в конструкторе мы задаем не количество потоков, а количество корутин, при котором барьер освобождается. Чтобы можно было сбрасывать значение счетчика в начальное значение, stateFlow объявлена как var. При сбросе счетчика мы записываем в нее новый экземпляр MutableStateFlow. Дополнительная синхронизация (например, @Volatile) на переменную stateFlow не нужна, так как все обращения к этой переменной происходят только внутри блоков @Synchronized. Разработаем тест для класса CoroutinesBarrier:

fun main() = runBlocking {     val startTimeMillis = System.currentTimeMillis()     val coroutinesBarrier = CoroutinesBarrier(3u)      val awaitTasks = Array(6) { index ->         async(Dispatchers.Default) {             println("Coroutine started.")             delay(index * 100L)             coroutinesBarrier.await(5_000.milliseconds)             val durationMillis = System.currentTimeMillis() - startTimeMillis             println("Coroutine released: duration=$durationMillis.")         }     }.forEach { it.await() }      val durationMillis = System.currentTimeMillis() - startTimeMillis     println("Test finished: duration=$durationMillis.")  }

В этом тесте мы запускаем шесть корутин с увеличивающейся задержкой (100, 200 … 600 миллисекунд). Сразу после задержки выставлен барьер на три корутины. Посмотрим, что выведет тест:

Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=583.
Coroutine released: duration=583.
Coroutine released: duration=583.
Test finished: duration=583.

Из результатов видно, что барьер работает. Корутины выходили из барьера по три штуки: первые три — спустя 285 миллисекунд, вторые три — спустя 583 миллисекунды. Однако, если в этом тесте увеличить количество корутин с шести до семи, то вместо вывода Test finished программа вылетит с ошибкой спустя пять секунд:

kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 5000 ms

Это логично, так как семь на три не делится нацело, значит две пачки корутин по три штуки пройдут барьер, а одна последняя корутина будет ждать до таймаута.

Заключение

В данной статье мы рассмотрели, как правильно применять прототипы синхронизации потоков в контексте корутин, выделили плюсы и минусы существующих решений, а также написали собственную реализацию примитивов барьерной синхронизации для корутин. В следующей статье мы разберем реактивные потоки, раздельный доступ, семафоры и акторы.


ссылка на оригинал статьи https://habr.com/ru/companies/garage8/articles/747948/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *