Вероятнее всего у вас спрашивали на собесе «как работают корутины под капотом?», вы не долго думая выбрасывали что‑то в стиле «там под капотом стейт‑машина, она определяет какая suspend функция будет выполняться», но понимали ли вы на самом деле всё о чем говорили? Возможно, только вам это известно, но если честно я очень плохо понимал собственные ответы на такие вопросы как бы это парадоксально не звучало и даже после десятка пройденных собесов у меня не было полноценной картины как работает внутрянка этой поистине невероятной библиотеки «сладкой асинхронщины».
Ладно, вступление и так получилось слишком затянутым, как это бывает в непрофессиональных фильмах от которых ждешь экшена половину хронометража, а потом получаешь унылые бои на светящихся палках, погнали короче разбираться!
Знакомство с крутыми перцами: CoroutineContext и CoroutineScope
Начнём с простенького примера:
fun main() = runBlocking { // запускаем новую корутину launch { println("Hello, I'm a Kotlin coroutine, how are you?") } }
Пока нас интересует только функция launch
, которая чаще всего используется для создания корутин, давайте провалимся в её исходники:
// (1) launch является Kotlin Extension функцией для CoroutineScope fun CoroutineScope.launch( // (2) контекст похож на HashMap'у, также хранит всякие штуки по ключу context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { // (3) при создании новой корутины контекст может быть изменён val newContext = newCoroutineContext(context) // к остальной части кода вернёмся позже }
Пройдёмся подробнее по пунктам, которые я выделил в комментах к коду:
1) Чтобы создать новую корутину нужно вызвать launch
в пределах CoroutineScope
, никак иначе, так было сделано чтобы корутина смогла получить контекст и пробрасывать его в дочерние корутины, если глянуть исходник CoroutineScope
, то всё станет очевидным:
interface CoroutineScope { val coroutineContext: CoroutineContext }
При запуске корутины берётся текущий контекст из CoroutineScope
в котором был вызван launch
.
2) CoroutineContext
не является хэш-таблицей, а реализован на основе паттерна Компоновщик:
/* основной прикол паттерна Компоновщик: создать дерево из вложенных друг в друга объектов чтобы реализовать такой паттерн нужен общий родитель, которым в данном случае является CoroutineContext */ /* обычные элементы контекста, такие как Job, CoroutineName и тд являются простыми объектами, которые не содержат другие или в терминах паттерна листовыми узлами */ interface Element : CoroutineContext { val key: Key<*> /* для удобного доступа к элементам контекста переопределён оператор get это позволяет делать такие штуки: coroutineContext[Job] вместо coroutineContext.get(Job) */ override operator fun <E : Element> get(key: Key<E>): E? = if (this.key == key) this as E else null } /* помимо листовых узлов есть комплексные наборы данных, которые могут в себя включать другие такие наборы и простые объекты */ class CombinedContext( val left: CoroutineContext, val element: CoroutineContext.Element ) : CoroutineContext { override fun <E : Element> get(key: Key<E>): E? { /* логика простая: проверяем сначала простой объект, если ключи не совпадают, смотрим left, если он является комплексным узлом CombinedContext, рекурсивно повторяем */ var currentContext = this while (true) { currentContext.element[key]?.let { return it } val next = currentContext.left if (next is CombinedContext) { currentContext = next } else { return currentContext.get(key) } } } } class CoroutineName(val name: String) : CoroutineContext.Element { /* в качестве ключей для обычных элементов CoroutineContext используются названия самих классов, что очень удобно поле key требует наследника Key<*> который определён ниже через companion object, это работает даже если на первый взгляд выглядит сомнительно и неоднозначно */ override val key: Key<*> = CoroutineName companion object Key : Key<CoroutineName> } fun main() { // Job и CoroutineDispatcher являются элементами CoroutineContext val combinedContext = CombinedContext( CombinedContext( Job(), Dispatchers.Default ), CoroutineName("My name's Kotlin coroutine") ) /* в итоге мы можем положить в CoroutineContext то что нужно корутинам: Job, CoroutineName, CoroutineDispatcher, CoroutineExceptionHandler и тд, а затем прокидывать контекст через CoroutineScope в сами корутины */ val job = combinedContext[Job] }
3) При создании новой корутины можно изменить CoroutineContext
:
launch(CoroutineName("I'm a parent coroutine")) { launch(CoroutineName("I'm child coroutine"))) { // ... } }
Логика работы в таком случае следующая: корутина получает текущий контекст из CoroutineScope
и складывает его с контекстом, переданным в качестве параметра функции launch
, таким образом дочерняя корутина из примера содержит другое имя.
Важный момент: не все элементы CoroutineContext
могут быть корректно изменены, например при указании у дочерней корутины другой Job
вы можете разрушить принцип Structured Concurrency, который заключается в каскадной отмене всех корутин, покажу на примере:
val topLevelJob = viewModelScope.launch { /* тут всё ок, при создании новой корутины возьмётся Job'а из текущего контекста, в нашем случае это topLevelJob, затем корутина будет добавлена как дочерняя Job'а и связь родитель - ребёнок не нарушится */ launch { println("I'm coroutine #1") } /* не ок, так как мы создаём Job'у которая не привязана к родительской, грубо говоря мы не сделали parentJob.attachChild(childJob), который кстати нельзя сделать потому что это internal api библиотеки */ launch(Job()) { println("I'm coroutine #2") } } // вторая дочерняя корутина не будет отменена topLevelJob.cancel()
Подведём итоги:
-
корутина создаётся через Kotlin Extension функцию
CoroutineScope.launch
-
CoroutineScope
является простым интерфейсом, который предоставляет корутине контекст -
CoroutineContext
нужен для хранения всяких полезных штук при выполнении корутины:CoroutineName
,CoroutineDispatcher
,Job
,CoroutineExceptionHandler
-
при создании корутины можно передать новый контекст, это приведёт к созданию контекста, основанного на текущем с изменёнными элементами, взятыми из нового
-
не все элементы контекста можно адекватно изменить, например нужно быть осторожным при указании другой
Job
‘ы, отличным решением будет придерживаться правила: менятьJob
только для самых высокоуровневых корутин, в идеале при созданииCoroutineScope
Continuation интерфейс и реализация suspend блока
Возвращаемся снова к начальному примеру:
fun main() = runBlocking { launch { println("Hello, I'm a Kotlin coroutine, how are you?") } } public fun CoroutineScope.launch( // ... block: suspend CoroutineScope.() -> Unit ): Job {}
Обратите внимание на ключевое слово suspend
, именно благодаря тому что launch
выполняет лямбду помеченную этим ключевым словом мы можем запускать suspend
функции в пределах корутины, что ещё интереснее эта лямбда во время компиляции превращается в нечто интересное:
BuildersKt.launch$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { int label = 0; public final Object invokeSuspend(Object var1) { Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch (this.label) { case 0: ResultKt.throwOnFailure(var1); String var2 = "Some"; System.out.println(var2); return Unit.INSTANCE; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } } // этот метод будет использоваться для создания Continuation объекта public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Function2 var3 = new <anonymous constructor>(completion); return var3; } }), 3, (Object)null);
Ага, вот она та самая стейт-машина (switch блок), про которую на собесе вскользь упонимают, сейчас она ничего сложного не делает, кроме как выводит текст в консоль и возвращает пустой результат.
После достаточно долгих вечеров копания в исходниках и процесса дебага я всё таки выяснил, что сгенерированный выше код это ничто иное как реализация абстрактного класса ContinuationImpl
, одного из наследников Continuation
:
/* само название говорит за себя, что это "продолжение" после приостановки корутины, да та самая магическая приостановка или SUSPENDED состояние о которой говорят постоянно, позже узнаем что никакой магии тут нет */ public interface Continuation<in T> { /* как мы выяснили в предыдущем разделе CoroutineContext содержит важные штуки для корутин, например CoroutineDispatcher, который может пригодиться для переключения потоков */ public val context: CoroutineContext /* вызов этого метода происходит после возвращения корутины из состояния приостановки, именно сюда кладутся результаты suspend функций, также дочерние корутины могут вызывать этот метод у родительских для продолжения работы последних */ public fun resumeWith(result: Result<T>) }
Без такой штуки как Continuation
корутины не могли бы возвращаться туда где произошла приостановка и следовательно мы не могли бы выполнять код на разных потоках, используя последовательную запись кода, напомню что одной из ключевых идей как раз и является выполнение асинхронного кода, как последовательного, небольшой пример:
// создаём корутину на главном потоке launch { // вызываем функцию fetchAndroidUnderTheHoodPosts() в background потоке val myPosts = fetchAndroidUnderTheHoodPosts() /* чтобы следующий код получил результат нужно как минимум каким-то образом получить его и не забыть переключиться на главный поток, а так как fetchAndroidUnderTheHoodPosts() выполняется на другом потоке и неизвестно когда функция закончит своё выполнение, остаётся только вариант передачи в функцию callback'а, который будет вызван когда она завершится, таким callback'ом является Continuation объект */ println(myPosts) }
После того как функция fetchAndroidUnderTheHoodPosts
завершит выполнение своего кода, результат будет передан через Continuation.resumeWith()
, что приведёт к дальнейшему выполнение корутины, в текущем примере — вывод всех постов в консоль.
Окей, мы определились что без Continuation
ничего не выйдет и даже узнали что suspend блок в launch
функции на самом деле тоже является Continuation
объектом и наследуется от ContinuationImpl
, но основная реализация содержится в BaseContinuationImpl
классе, от которого наследуется ContinuationImpl
, сложно? Ничего, привыкайте, это хардкор, а не няшный мульт про поней.
Идём смотреть на реализацию Continuation
для suspend блока:
/* компилятор генерирует реализацию этого абстрактного класса для suspend блоков */ internal abstract class ContinuationImpl( completion: Continuation<Any?>?, // контекст берётся из корутины, дальше это увидим private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) public override val context: CoroutineContext get() = _context!! private var intercepted: Continuation<Any?>? = null /* ContinuationInterceptor это штука которая оборачивает текущий Continuation объект в новый, например DispatchedContinuation и возвращает его, такой механизм используется для переключения потоков через CoroutineDispatcher, кстати оборачивание одного объекта в другой с общим интерфейсом (Continuation) ничто иное как паттерн Декоратор */ public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } // очистка обёрнутого Continuation объекта: // зануление ненужных ссылок и тд protected override fun releaseIntercepted() { val intercepted = intercepted if (intercepted != null && intercepted !== this) { context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted) } // корутина закончила своё выполнение this.intercepted = CompletedContinuation } } /* если в ContinuationImpl реализована поддержка CoroutineDispatcher'ов, то в BaseContinuationImpl содержится основная логика работы с состоянием приостановки */ internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable {и public final override fun resumeWith(result: Result<Any?>) { var current = this var param = result while (true) { with(current) { /* текущая реализация Continuation требует в качестве completion более высокоуровневый Continuation объект, обычно им является сама корутина напоминаю что мы сейчас рассматриваем реализацию suspend блока в launch функции, а не саму корутину */ val completion = completion!! val outcome: Result<Any?> = try { /* вот тут происходит самое интересное, invokeSuspend выполняет внутри себя тот самый сгенерированный код в котором содержатся наши suspend функции и если одна из них перешла в состояние приостановки, метод тупо делает return, Continuation в данном случае не продолжит своё выполнение пока не будет снова вызван resumeWith() */ val outcome = invokeSuspend(param) /* COROUTINE_SUSPENDED - зарезервированная константа, сигнализирующая о том, что внутри invokeSuspend произошла приостановка */ if (outcome === COROUTINE_SUSPENDED) return // если invokeSuspend вернул результат получаем его Result.success(outcome) } catch (exception: Throwable) { // если произошло исключение тоже получаем его Result.failure(exception) } /* так как текущий BaseContinuationImpl получил результат, значит suspend блок в корутине завершился, поэтому текущий объект BaseContinuationImpl больше не нужен, а следовательно всякие используемые штуки, такие как CoroutineDispatcher'ы например должен быть очищены */ releaseIntercepted() /* если мы запустили корутину в другой корутине, то в качестве completion будет suspend блок родительской корутины: launch { родительский suspend блок BaseContinuationImpl launch { дочерний suspend блок BaseContinuationImpl } } */ if (completion is BaseContinuationImpl) { current = completion param = outcome } else { /* вызывается самый высокоуровневый Continuation объект, в большинстве случаев это сама корутина */ completion.resumeWith(outcome) return } } } } // тот самый метод, переопределённый в сгенерированном коде protected abstract fun invokeSuspend(result: Result<Any?>): Any? }
Суммируем:
-
мы имеем
Continuation
интерфейс, позволяющий продолжить выполнение корутины после её приостановки -
для suspend блока генерируется специальная реализация
ContinuationImpl
со стейт-машиной (switch или when конструкцией) в переопределённом методеinvokeSuspend()
-
когда suspend функция приостанавливается происходят следующие вещи:
—invokeSuspend()
возвращает специальное значениеCOROUTINE_SUSPENDED
—BaseContinuationImpl
завершается через return и ожидает следующего вызоваresumeWith()
-
логика обработки состояния приостановки содержится в
BaseContinuationImpl
, а логика переключения потоков с помощьюCoroutineDispatcher
‘ов происходит в наследникеContinuationImpl
.
Что же такое корутина?
Теперь мы можем разобраться чем же на самом деле является корутина и как происходит выполнение suspend блока в ней, для этого снова возвращаемся к исходникам launch
функции:
public fun CoroutineScope.launch( // ... block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) /* мы не будем рассматривать LazyStandaloneCoroutine, так как эта штука очень похожа на базовую реализацию корутины StandaloneCoroutine, только запускается по требованию */ val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) /* запускаем корутину, LazyStandaloneCoroutine не запустится таким образом, нужно будет вручную вызвать метод start() у объекта Job, который возвращает launch */ coroutine.start(start, coroutine, block) return coroutine }
Помните в BaseContinuationImpl
был такой параметр как completion, я ещё говорил что им может быть сама корутина, так вот StandaloneCoroutine
и есть реализация корутины, короче не будем тянуть, идём смотреть исходники:
// корутина наследует Job, Continuation и CoroutineScope class StandaloneCoroutine<in T>( parentContext: CoroutineContext, active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { init { /* кладёт Job текущей корутины в Job'у родительской это нужно чтобы родительские корутины знали о дочерних и не завершались раньше них */ initParentJob(parentContext[Job]) } /* так можно сделать потому что StandaloneCoroutine является реализацией Job, а Job является одним из элементов CoroutineContext'а */ override val context: CoroutineContext = parentContext + this // вы уже знаете, что когда корутина выходит из состояния приостановки, // вызывается метод Continuation.resumeWith() override fun resumeWith(result: Result<T>) { // makeCompletingOnce пытается завершить Job'у корутины val state = makeCompletingOnce(result.toState()) // если у текущей Job'ы есть дочерние и они не были завершены, // корутина не может быть завершена if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } /* запуск suspend блока в корутине происходит в CoroutineStart enum'е, в качестве параметра receiver передаётся сама корутина, это нужно чтобы suspend блок получил CoroutineContext, если уже забыли про это, возвращайтесь к исходнику ContinuationImpl completion это как раз то самый высокоуровневый Continuation объект, вызываемый из сгенерированного Continuation объекта для suspend блока, как я уже говорил ранее им является сама корутина */ fun <R> start( start: CoroutineStart, receiver: R, block: suspend R.() -> T ) { start(block = block, receiver = receiver, competion = this) } } // поначалу я долго искал где запускается корутина, так как не сразу // заметил что у CoroutineStart переопределён invoke оператор enum class CoroutineStart { // я опустил остальные варианты, оставил только базовый DEFAULT; operator fun <R, T> invoke( block: suspend R.() -> T, receiver: R, completion: Continuation<T> ): Unit = when (this) { // для suspend блока будет сгенерированна ContinuationImpl // реализация, которая запустится при вызове launch функции DEFAULT -> block.startCoroutineCancellable(receiver, completion) else -> Unit } }
Осталось только разобраться как все таки запускается suspend блок, для этого проваливаемся ещё на уровень ниже, в startCoroutineCancellable()
функцию:
// обратите внимание, что startCoroutineCancellable() является // Kotlin Extension функцией для suspend блока internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ) = // первым делом из suspend блока создаётся Continuation объект createCoroutineUnintercepted(receiver, completion) /* далее оборачивается в DispatchedContinuation, если используется CoroutineDispatcher, а он в большинстве случаев используется */ .intercepted() /* ну и происходит вызов Continuation.resumeWith() в зависимости от типа Continuation, чтобы блок в корутине начал выполняться, иначе ничего не произойдёт */ .resumeCancellableWith(Result.success(Unit), onCancellation) // создаёт Continuation из suspend блока actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T> ): Continuation<Unit> { /* в нашем случае suspend блок является объектом ContinuationImpl, а это наследник BaseContinuationImpl, поэтому выполняется первая ветка */ return if (this is BaseContinuationImpl) // метод create будет сгенерирован компилятором для ContinuationImpl // реализации, кстати ранее уже был пример сгенерированного кода create(receiver, completion) else ... } /* делает вызов resumeWith в зависимости от типа Continuation: 1) DispatchedContinuation может передать вызов resumeWith() CoroutineDispatcher'у, который выполнит код на другом потоке 2) обычный вызов Continuation.resumeWith() произойдёт без смены потоков и тд */ fun <T> Continuation<T>.resumeCancellableWith( result: Result<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) else -> resumeWith(result) }
Можно сказать что вы прошли джедайское обучение и готовы к настоящему бою, разберём самый первый пример:
fun main() = runBlocking { // запускаем новую корутину launch { println("Hello, I'm a Kotlin coroutine, how are you?") } }
Если посмотреть на пример с точки зрения внутрянки корутин, то получим примерно следующий код:
fun main() { SuspendLaunchBlock( // (2) completion = StandaloneCoroutine() ).resumeWith(Result.success(Unit)) // (3) } // (1) class SuspendLaunchBlock( completion: StandaloneCoroutine<Any?> ) : ContinuationImpl(completion) { var label = 0 // (5) fun resumeWith(result: Result<Any?>) { try { val newResult = invokeSuspend(result) if (newResult === COROUTINE_SUSPENDED) return Result.success(newResult) } catch (exception: Throwable) { Result.failure(exception) } completion.resumeWith(newResult) } // (4) fun invokeSuspend(result: Result<Any?>): Any? { when(label) { 0 -> { throwIfFailureResult(result) println("Hello, I'm a Kotlin coroutine, how are you?") return Unit } else -> error("Illegal state") } } } class StandaloneCoroutine<T>(...) : Continuation<T> { // (6) fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } }
Давайте по порядку:
-
Создаётся сгенерированная реализация
SuspendLaunchBlock
для suspend блока вlaunch
функции со стейт-машиной или конкретнее when конструкцией. -
Создаётся
StandaloneCoroutine
и передаётся в качестве параметра completion вSuspendLaunchBlock
-
Запускается корутина через вызов
SuspendLaunchBlock.resumeWith()
метода, который далее выполняет сгенерированныйinvokeSuspend()
метод -
В
invokeSuspend()
выполняется единственная ветка в when блоке — вывод в консоль и возвращение пустого результата -
После завершения
invokeSuspend()
вSuspendLaunchBlock.resumeWith()
происходит сначала проверка на состояние приостановки, в данном случаеinvokeSuspend()
выполнилась без приостановки, поэтому сразу вызываетсяcompletion.resumeWith()
, а так как completion этоStandaloneCoroutine
, то вызываетсяStandaloneCoroutine.resumeWith()
реализация -
StandaloneCoroutine.resumeWith()
проверяет нет ли незавершенных дочерних корутин, у нас их нет, и прекращает выполнение
Вы практически магистр джедаев! Сделайте паузу, заварите чай или кофе, скушайте шоколадку и ещё раз пройдитесь по примеру, очень важно понять что механизм корутин заключается в переходах между состояниями приостановки через вызов Continuation.resumeWith()
метода, а завершение происходит в корневом Continuation
объекте и самое главное, что нет никакой магии.
А если в корутине цепочка из suspend функций?
Усложним пример из прошлого раздела, добавив две suspend функции:
// опустим подробности реализации suspend fun fetchAuthToken() = ... suspend fun fetchProfileData(token: String) = ... fun main() = runBlocking { // запускаем корутины с двумя suspend функциями launch { val token = fetchAuthToken() val profile = fetchProfileData(token = token) println(profile) } }
Интересно какой теперь будет код с точки зрения внутрянки корутин, смотрим:
fun main() { SuspendLaunchBlock( // (2) completion = StandaloneCoroutine() ).resumeWith(Result.success(Unit)) // (3) } // (5) suspend fun fetchAuthToken(continuation: SuspendLaunchBlock): Any? { /* любое асинхронное выполнение кода приводит к состоянию приостановки это необязательно использование многопоточности, дальше вы это увидите runCodeInBackground - магический метод, который выполняет блок кода в фоновом потоке */ runCodeInBackground { val token = ... runCodeInMain { // (6) // магический метод, который выполняет блок кода на главном потоке continuation.resumeWith(token) } } return COROUTINE_SUSPENDED } suspend fun fetchProfileData(token: String) = ... // (1) class SuspendLaunchBlock( completion: StandaloneCoroutine<Any?> ) : ContinuationImpl(completion) { var label = 0 fun resumeWith(result: Result<Any?>) { try { val newResult = invokeSuspend(result) if (outcome === COROUTINE_SUSPENDED) return Result.success(newResult) } catch (exception: Throwable) { Result.failure(exception) } // (9) completion.resumeWith(newResult) } fun invokeSuspend(result: Result<Any?>): Any? { // while(true) нужен чтобы выполнять ветки дальше, // если suspend функция не перешла в состояние приостановки while (true) { when(label) { // (4) 0 -> { throwIfFailureResult(result) label = 1 // Continuation передаётся в качестве // аргумента suspend функции val state = fetchAuthToken(this) if (state == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } // (7) 1 -> { throwIfFailureResult(result) label = 2 val token = result.unwrap() val state = fetchProfileData(token, this) if (state == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } // (8) 2 -> { throwIfFailureResult(result) val profile = result.unwrap() println(profile) break } else -> error("Illegal state") } } return Unit } } class StandaloneCoroutine<T>(...) : Continuation<T> { // (10) fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } }
Чтобы было поинтереснее будем считать, что обе suspend функции переходят в состояние приостановки, смотрим логику:
-
Создаётся сгенерированная реализация
SuspendLaunchBlock
для suspend блока вlaunch
функции со стейт-машиной или конкретнее when конструкцией. -
Создаётся
StandaloneCoroutine
и передаётся в качестве параметра completion вSuspendLaunchBlock
-
Запускается корутина через вызов
SuspendLaunchBlock.resumeWith()
метода, который далее выполняет сгенерированныйinvokeSuspend()
метод -
В
invokeSuspend()
выполняется первая ветка (label == 0), где происходит вызов функцииfetchAuthToken()
, в качестве единственного параметра передаётся текущийContinuation
объект, в данном случае этоSuspendLaunchBlock
, значение переменной label меняется на 1 -
Функция
fetchAuthToken()
возвращает значениеCOROUTINE_SUSPENDED
, что свидетельствует о состоянии приостановки, важно что здесь нет никакой магии, выполнение кода происходит в другом потоке, а так как это асинхронное выполнение, внешнему коду можно только передать результат через callback, которым кстати являетсяSuspendLaunchBlock
-
После выполнение своего кода
fetchAuthToken()
вызывает методSuspendLaunchBlock.resumeWith()
с результатом своей работы, в примере это строка с токеном -
SuspendLaunchBlock.resumeWith()
возобновляет своё выполение и повторно вызывает invokeSuspend(), где уже выполняется вторая ветка (label == 1), в ней происходит вызов fetchProfileData() метода, в качестве первого параметра он принимает токен от предыдущей suspend функцииfetchAuthToken()
, а в качестве второго ссылку на Continuation объект, которым как мы уже знаем являетсяSuspendLaunchBlock
, методfetchProfileData()
выполняется аналогичноfetchAuthToken()
, label становится равным 2 -
В последней ветке
invokeSuspend()
, где label == 2, происходит вывод в консоль результата функцииfetchProfileData()
и возвращение пустого значения -
На этот раз возвращенное значение из
invokeSuspend()
не являетсяCOROUTINE_SUSPENDED
, поэтому выполнениеSuspendLaunchBlock
завершается, дальнейшее управление передаётсяStandaloneCoroutine
через вызовcompletion.resumeWith()
-
StandaloneCoroutine.resumeWith()
проверяет нет ли незавершенных дочерних корутин, у нас их нет, и прекращает выполнение
Отлично, теперь вы знаете как происходят переходы между отдельными suspend функциями, поздравляю, вы можете смело умничать на собесах, но не забывайте что скромность красит человека)
Переключение потоков, delay() и CoroutineDispatcher
Мы забыли о самом важном ради чего в принципе используются корутины — выполнение suspend функций на других потоках, отсюда собственно и возникает потребность приостановить выполнение текущей корутины, как мы уже выяснили для этого нужно передать Continuation
объект suspend функции и ждать пока она сама не вызовет Continuation.resumeWith()
метод:
fun fetchAuthToken(continuation: Continuation<Any?>): Any? { // магический метод, который выполняет блок кода в фоновом потоке runCodeInBackground { val token = ... // магический метод, который выполняет блок кода на главном потоке runCodeInMain { /* чтобы сообщить корутине что suspend функция завершила своё выполнение нужно вызвать Continuation.resumeWith() с результатом работы функции */ continuation.resumeWith(token) } } // так как функция не может сразу вернуть результат, // она переходит в состояние приостановки return COROUTINE_SUSPENDED }
Это достаточно упрощенная версия кода, но зато она отражают общий механизм, а самое главное показывает что состояние приостановки это ничто иное как выполнение некоторого кода на другом потоке и возвращение результата через Continuation.resumeWith
как через обычный callback.
В прошлом разделе я вскользь упомянул, что любое асинхронное выполнение приводит к состоянию приостановки, но не раскрыл эту тему, давайте разбираться.
Что нужно чтобы код стал асинхронным? Правильно сделать его выполнение независимым от текущей точки выполнения, например создать новый поток:
fun main() { // создаётся новый поток и сразу запускается Thread { val sum = 3 + 7 println(sum) }.start() // код main() продолжает выполняться независимо от того, // выполнился ли весь код в Thread val mul = 3 * 7 println(mul) // функция main() может завершиться раньше созданного потока }
Но давайте на минутку забудем про многопоточность и вспомним главный поток Android, он позволит нам выполнить код асинхронно? Конечно же да, можно это сделать через старый добрый Handler
:
// Handler поставит выполнение кода в очередь главного потока handler.post { println("I'll run soon") }
Кстати Handler
используется в реализации функции delay
для главного потока Android, если забыли, то эта функция позволяет сделать задержку без блокировки текущего потока:
fun delay(continuation: Continuation<Any?>): Any? { val block = Runnable { // после того как задержка пройдёт, выполнится этот блок кода // и корутина продолжит своё выполнение после приостановки continuation.resumeWith(Result.success(Unit)) } // handler.postDelayed() выполняет block через указанный // промежуток времени в миллисекундах if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { // если корутина была отменена нужно отменить задержку continuation.invokeOnCancellation { handler.removeCallbacks(block) } } else { // отменяет текущую корутину, так как Handler для главного потока // был закрыт cancelOnRejection(continuation.context, block) } return COROUTINE_SUSPENDED }
Вот и вся суть асинхронного выполнения кода, им может быть любой механизм, позволяющий выполнить X код независимо от Y кода и если попробовать дать определение корутинам сейчас, то оно будет примерно такое:
Корутина — это абстракция, которая оборачивает некоторую асинхронную работу, это может быть выполнение кода в другом потоке или использование очереди главного потока, например MessageQueue из Android, в удобный последовательный код с механизмами отмены и другими прикольными фишками.
Ладно, вроде бы разобрались что такое приостановка корутины и асинхронное выполнение кода, можем переходить к более прикладным вещам, например к функции withContext()
, чаще всего используемой для изменения CoroutineDispatcher
‘а:
suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T { /* как мы знаем Continuation работает под капотом корутин и его нельзя получить явно в прикладном коде, поэтому была придумана inline функция suspendCoroutineUninterceptedOrReturn (и не одна кстати), которая после компиляции подставит текущий Continuation объект */ return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> val oldContext = uCont.context /* если вы ещё не забыли то новый контекст производится путём сложения двух контекстов, в качестве результата мы имеем контекст в котором старые элементы заменены новыми, например Dispatchers.Main можно поменять на Dispatchers.Default */ val newContext = oldContext.newCoroutineContext(context) // проверка что корутина все ещё выполняется newContext.ensureActive() // мы не будем рассматривать все ветки, нас интересует только // переключение потоков через CoroutineDispatcher if (newContext === oldContext) { ... } // CoroutineDispatcher является наследником ContinuationInterceptor if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { ... } // ну вот опять создаётся какая-то неизвестная нам корутина, // не беспокойтесь там достаточно простая логика val coroutine = DispatchedCoroutine(newContext, uCont) // стартуем также как и обычную корутину block.startCoroutineCancellable(coroutine, coroutine) /* если есть возможность сразу отдать результат без приостановки корутины то withContext сразу завершится, в противном случае корутина, содержащая withContext() вызов перейдёт в состояние приостановки */ coroutine.getResult() } }
Функция withContext
делает 3 простые вещи:
-
Получает текущий
Continuation
объект, он может быть взят из параметра suspend функции или из текущей корутины в которой была вызвана функцияwithContext
-
Берёт контекст из
Continuation
объекта и складывает с контекстом, переданным в качестве параметра, в результате создаётся новый контекст -
На основе нового контекста создаёт определённый вид корутин, например если был изменён
CoroutineDispatcher
будет создана корутинаDispatchedCoroutine
Что ж давайте теперь глянем исходники DispatchedCoroutine
:
/* чаще всего в качестве continuation параметра выступает Continuation объект, который генерируется для suspend блока в корутине, если забыли, то это реализация абстрактного класса ContinuationImpl */ internal class DispatchedCoroutine<in T> internal constructor( context: CoroutineContext, continuation: Continuation<T> ) : ScopeCoroutine<T>(context, uCont) { /* метод afterResume() вызывается перед завершением Continuation.resumeWith(), когда корутина закончила выполнять все свои suspend функции и у неё больше нет дочерних корутин в состоянии выполнения */ override fun afterResume(state: Any?) { /* метод afterResume() может быть вызван раньше getResult(), например если блок кода в withContext() очень быстро выполнился в таком случае результат вернёт getResult() */ if (tryResume()) return /* Я уже вскользь упоминал что делает каждый метод в этой цепочке, когда мы рассматривали как стартует корутины, ещё раз повторим: intercepted() оборачивает continuation в DispatchedContinuation, который реализует логику работы с CoroutineDispatcher'ами resumeCancellableWith() вызывает resumeWith() в зависимости от типа Continuation, в данном случае будет вызван метод DispatchedContinuation.resumeCancellableWith() */ continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont)) } internal fun getResult(): Any? { // если нельзя сразу вернуть результат корутина приостанавливается if (trySuspend()) return COROUTINE_SUSPENDED // результат выполнения withContext() val state = ... return state as T } }
Суммируем, DispatchedCoroutine
выполняет две ключевые задачи:
-
Добавляет возможность вернуть результат без перехода в состояние приостановки, если такая возможность есть, для этого используется
getResult()
метод. -
Переключает
Continuation
объект, в котором был вызванwithContext()
, на родной поток, например если ваша корутина выполняется на главном потоке, затем вызываетwithContext()
на фоновом, то результат должен вернуться снова на главный.
Ладно, с DispatchedCoroutine
более менее разобрались, чтобы понять как на самом деле происходит переключение потоков в корутинах провалимся в DispatchedContinuation
, в который оборачиваются другие Continuation
объекты:
/* DispatchedContinuation принимает на вход: dispatcher - выполняет блок кода, чаще всего на другом потоке или с использованием очереди, например Handler / MessageQueue из Android continuation - Continuation объект, работа с которым будет происходить через указанный диспатчер */ internal class DispatchedContinuation<in T>( val dispatcher: CoroutineDispatcher, val continuation: Continuation<T> ) : Continuation<T> by continuation { /* логика resumeWith() идентична resumeCancellableWith() с отличием только в разных режимах resumeMode, обычно чаще всего вызывается именно resumeCancellableWith, так как режим MODE_CANCELLABLE позволяет прокинуть CancellationException для отмены корутины */ override fun resumeWith(result: Result<T>) { ... } internal inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) /* CoroutineDispatcher имеет два логически связанных метода: isDispatchNeeded() решает выполнять код в диспатчере или нет dispatch() выполняет код в диспатчере: код может выполниться на другом потоке, поставлен в очередь и тд */ if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE // для наглядности я упростил блок кода и написал его здесь val block = Runnable { continuation.resumeWith(state) } dispatcher.dispatch(context, block) } else { /* если диспатчер не хочет выполнять код, а такое может быть например если диспатчер переключает на главный поток, а мы уже на главном потоке и внутри диспатчера реализована проверка, то isDispatchNeeded() вернёт false, в таком случае выполнение корутины будет добавлено в EventLoop */ executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } } }
Как видите логика CoroutineDispatcher
‘а достаточно простая:
-
Вызывается метод
isDispatchNeeded()
чтобы понять отдавать выполнение кода диспатчеру или нет, это нужно чтобы избежать лишних вызововdispatch()
, например не делать переключение на главный поток, если мы уже находимся на нём -
Если
isDispatchNeeded()
вернул true разумеется отдаём выполнение кода диспатчеру, вызвав методdispatch()
-
Если
isDispatchNeeded()
вернул false запускаем корутину наEventLoop
‘е, об этом в следующем разделе
В качестве примера рассмотрим такие интересные диспатчеры из Android, как Dispatchers.Main
и Dispatchers.Main.immediate
:
val mainLooper = Looper.getMainLooper() val handler = Handler(mainLooper) // реализация для Dispatchers.Main override fun isDispatchNeeded(...) = true // реализация для Dispatchers.Main.immediate override fun isDispatchNeeded(...): Boolean { // сравнивает Looper текущего потока с главным return Looper.myLooper() != mainLooper } override fun dispatch( context: CoroutineContext, block: Runnable ) { // handler.post() выполняет блок кода на главном потоке handler.post(block) }
Вот и вся разница между ними: Dispatchers.Main
всегда переключает выполнение кода на главный поток через Handler.post()
, а Dispatchers.Main.immediate
только, если код не выполняется на главном потоке.
Для закрепления знаний попробуем собрать всё воедино и описать логику для следующего примера:
// примерно такой код можно встретить в рабочих проектах viewModelScope.launch { // я не стал выносить в отдельную функцию, чтобы не усложнять пример val posts = withContext(Dispatchers.IO) { try { // получаем список постов в background потоке apiService.fetchPosts() } catch (exception: Exception) { // важно прокидывать CancellationException дальше // так как это часть механизма отмены корутины if (exception is CancellationException) throw exception emptyList() } } // отображаем данные на главном потоке println(posts) }
Под капотом весь этот код будет выглядить примерно как-то так:
// (1) class ViewModelScopeLaunchBlock( completion: Continuation<Any?> ) : ContinuationImpl(completion) { var label = 0 // (5) fun resumeWith(result: Result<Any?>) { try { val newResult = invokeSuspend(result) if (outcome === COROUTINE_SUSPENDED) return Result.success(newResult) } catch (exception: Throwable) { Result.failure(exception) } // (17) completion.resumeWith(newResult) } fun invokeSuspend(result: Result<Any?>): Any? { // while(true) нужен чтобы выполнять ветки дальше, // если suspend функция не перешла в состояние приостановки while (true) { when(label) { // (5) 0 -> { throwIfFailureResult(result) label = 1 // Continuation передаётся в качестве // аргумента suspend функции val state = fetchPosts(this) // (10) if (state == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } // (16) 1 -> { throwIfFailureResult(result) val profile = result.unwrap() println(profile) break } else -> error("Illegal state") } } return Unit } } class StandaloneCoroutine(...) { // (18) fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } } // (6) class WithContextBlock( completion: DispatchedCoroutine ) : ContinuationImpl(completion) { var label = 0 // (12) fun resumeWith(result: Result<Any?>) { try { val newResult = invokeSuspend(result) if (outcome === COROUTINE_SUSPENDED) return Result.success(newResult) } catch (exception: Throwable) { Result.failure(exception) } // (12) completion.resumeWith(newResult) } // (11) fun invokeSuspend(result: Result<Any?>): Any? { try { val posts = apiService.fetchPosts() return posts } catch (exception: Exception) { // важно прокидывать CancellationException дальше так как это часть // механизма отмены корутины, вспомните resumeCancellableWith if (exception is CancellationException) throw exception return emptyList() } } } class DispatchedCoroutine( ... // (7) val continuation: ViewModelScopeLaunchBlock ): ScopeCoroutine(context, continuation) { // (13) fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } // (14) override fun afterResume(state: Any?) { if (tryResume()) return continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont)) } } class DispatchedContinuation<in T>( val dispatcher: CoroutineDispatcher, val continuation: Continuation<T> ) : Continuation<T> by continuation { // (4, 9, 15) inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE val block = Runnable { continuation.resumeWith(state) } // (9, 15) dispatcher.dispatch(context, block) } else { // (4) continuation.resumeWith(state) } } } // (2) val topLevelCoroutine = StandaloneCoroutine(...) val viewModelScopeLaunchBlock = ViewModelScopeLaunchBlock( // (2) completion = topLevelCoroutine ) // (3) DispatchedContinuation( dispatcher = Dispachers.Main.immediate continuation = viewModelScopeLaunchBlock ).resumeCancellableWith(Result.success(Unit)) val withContextBlock = WithContextBlock( // (7) completion = DispatchedCoroutine(viewModelScopeLaunchBlock) ) // (8) DispatchedContinuation( dispatcher = Dispatchers.IO, continuation = withContextBlock ).resumeCancellableWith(Result.success(Unit))
Очень запутанно? Согласен, давайте по порядку:
-
Генерируется реализация
ViewModelScopeLaunchBlock
для suspend блока вviewModelScope.launch()
функции со стейт-машиной или конкретнее when конструкцией. -
Создаётся
StandaloneCoroutine
и передаётся в качестве параметра completion вViewModelScopeLaunchBlock
-
ViewModelScopeLaunchBlock
оборачивается вDispatchedContinuation
, в качестве диспатчера передаётся тот, что был указан вviewModelScope
, для Android этим диспатчером будетDispatchers.Main.immediate
-
Вызывается
DispatchedContinuation.resumeCancellableWith()
дляViewModelScopeLaunchBlock
, где происходит проверка на главный поток, но так какviewModelScope.launch()
и так выполняется на главном потоке, то не будет никакого переключения через диспатчер, а методViewModelScopeLaunchBlock.resumeWith()
будет вызван напрямую -
В
ViewModelScopeLaunchBlock.resumeWith()
происходит вызовinvokeSuspend()
, где начинает выполняться первая ветка (label == 0), в которой вызываетсяfetchPosts()
, в качестве первого параметра передаётся ссылка на текущийContinuation
объект, в данном случае этоViewModelScopeLaunchBlock
, переменная label становится равной 1 -
В функции
fetchPosts()
вызываетсяwithContext()
, который также как иviewModelScope.launch()
принимает suspend блок, поэтому генерируется реализацияWithContextBlock
-
Создаётся
DispatchedCoroutine
и передаётся в качестве параметра completion вWithContextBlock
.
Обратите внимание, чтоDispatchedCoroutine
в качестве объектаContinuation
принимаетViewModelScopeLaunchBlock
, блок из которого вызывается функцияwithContext()
, так как нам нужно каким-то образом вернуть результат обратно. -
WithContextBlock
запускается аналогичноViewModelScopeLaunchBlock
, также оборачивается вDispatchedContinuation
, только теперь в качестве диспатчера передаётсяDispatchers.IO
. -
Вызывается
DispatchedContinuation.resumeCancellableWith()
дляWithContextBlock
, где происходит переключение главного потока на background поток черезDispatchers.IO
диспатчер,WithContextBlock.resumeWith()
теперь будет выполняться на background потоке -
DispatchedCoroutine
не может отдать сразу результат запроса и поэтомуfetchPosts()
вViewModelScopeLaunchBlock.invokeSuspend()
возвращаетCOROUTINE_SUSPENDED
, что приводит к приостановке корутины -
Метод
WithContextBlock.invokeSuspend()
выполняет единственную ветку кода — запрос в сеть и получение ответа на background потоке. -
Когда запрос завершится метод
WithContextBlock.invokeSuspend()
вернёт результат вWithContextBlock.resumeWith()
, где произойдёт дальнейшая отправка результата через вызовcompletion.resumeWith()
в объект корутины, в данном случае этоDispatchedCoroutine
-
В
DispatchedCoroutine.resumeWith()
сначала произойдёт проверка на дочерние корутины в состоянии выполнения и если их нет, как в данном примере, выполнится кодDispatchedCoroutine.afterResume()
-
Метод
DispatchedCoroutine.afterResume()
должен вернуть результат вViewModelScopeLaunchBlock
, но здесь есть проблема:WithContextBlock
сейчас выполняется на background потоке, который предоставилDispatchers.IO
, аViewModelScopeLaunchBlock
должен получить результат на главном, поэтому вызывается цепочкаcontinuation.intercepted().resumeCancellableWith()
, методintercepted()
не будет повторно оборачиватьViewModelScopeLaunchBlock
вDispatchedContinuation
, это сделано для оптимизации -
Снова вызывается
DispatchedContinuation.resumeCancellableWith()
дляViewModelScopeLaunchBlock
, но только теперь происходит переключение на главный поток через диспатчерDispatchers.Main.immediate
, если вы не забыли там под капотомHandler.post()
вызов, в итогеViewModelScopeLaunchBlock.resumeWith()
выполняется на главном потоке, а в качестве результата передаётся список постов -
В
ViewModelScopeLaunchBlock.resumeWith()
происходит второй вызовViewModelScopeLaunchBlock.invokeSuspend()
, теперь уже выполняется вторая ветка (label == 1), которая берёт список постов и выводит его в консоль. -
Метод
ViewModelScopeLaunchBlock.invokeSuspend()
завершается успешно без приостановки, поэтомуViewModelScopeLaunchBlock.resumeWith()
заканчивает своё выполнение и делает вызовcompletion.resumeWith()
, где в качестве completion выступает корутинаStandaloneCoroutine
-
В
StandaloneCoroutine.resumeWith()
происходит проверка на дочерние корутины в состоянии выполнения, их в данном примере нет, корутина завершается.
Если вы дошли до этого момента, то вы явно не простой перец, обязательно сделайте перерыв с шоколадкой и попробуйте аналогичным образом проследить путь выполнение корутин в своём коде.
К сожалению это ещё не конец, корутины могут запускаться в других корутинах и не по одной, а целыми пачками, здесь работает похожий механизм с диспатчерами, но помимо него есть ещё и EventLoop
, если вы уже отдохнули и готовы сделать последний рывок, продолжаем!
Дочерние корутины, EventLoop и runBlocking
Выполнение дочерних корутин важно разграничивать на два вида:
-
Дочерняя корутина запускается через диспатчер
-
Дочерняя корутина выполняется на
EventLoop
‘е
Первый случай возникает, когда диспатчер всегда переключает выполнение корутины, даже если она выполняется на правильном потоке, например Dispatchers.Main
, второй же свойственен для диспатчеров, где переключение происходит только по необходимости, яркий пример из Android: Dispatchers.Main.immediate
, который переключает выполнение корутины на главный поток, только если она не выполняется на нём.
Что ж, рассмотрим по порядку оба случая, начнём с первого:
/* Dispatchers.Main всегда переключает выполнение корутины на главный поток через Handler.post() механизм, даже если корутина и так выполняется на главном */ val uiScope = CoroutineScope(Dispatchers.Main + Job()) uiScope.launch { launch { println("I'm child coroutine #1") } launch { println("I'm child coroutine #2") } println("I'm parent coroutine") } // примерно во что всё это превратится class UiScopeParentBlock( completion: StandaloneCoroutine ) : ContinuationImpl(completion) { var label = 0 fun resumeWith(result: Result<Any?>) { try { val newResult = invokeSuspend(result) if (outcome === COROUTINE_SUSPENDED) return Result.success(newResult) } catch (exception: Throwable) { Result.failure(exception) } completion.resumeWith(newResult) } fun invokeSuspend(result: Result<Any?>): Any? { when(label) { 0 -> { throwIfFailureResult(result) println("I'm parent coroutine") return Unit } else -> error("Illegal state") } } } class UiScopeChild1Block( completion: UiScopeParentBlock ) : ContinuationImpl(completion) { var label = 0 fun resumeWith(result: Result<Any?>) { try { val newResult = invokeSuspend(result) if (outcome === COROUTINE_SUSPENDED) return Result.success(newResult) } catch (exception: Throwable) { Result.failure(exception) } completion.resumeWith(newResult) } fun invokeSuspend(result: Result<Any?>): Any? { when(label) { 0 -> { throwIfFailureResult(result) println("I'm child coroutine #1") return Unit } else -> error("Illegal state") } } } class UiScopeChild2Block( completion: UiScopeParentBlock ) : ContinuationImpl(completion) { var label = 0 fun resumeWith(result: Result<Any?>) { try { val newResult = invokeSuspend(result) if (outcome === COROUTINE_SUSPENDED) return Result.success(newResult) } catch (exception: Throwable) { Result.failure(exception) } completion.resumeWith(newResult) } fun invokeSuspend(result: Result<Any?>): Any? { when(label) { 0 -> { throwIfFailureResult(result) println("I'm child coroutine #2") return Unit } else -> error("Illegal state") } } } class StandaloneCoroutine(...) { fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) /* resumeWith() не завершится полностью пока дочерние корутины не закончат своё выполнение когда мы рассматривали код StandaloneCoroutine, то можно было увидеть как происходит добавление Job'ы дочерней корутины в Job'у родительской, поэтому родительская корутина знает состояния своих дочерних корутин */ if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } } // родительская корутина val parentCoroutine = StandaloneCoroutine(...) val uiScopeParentBlock = UiScopeParentBlock( completion = parentCoroutine ) DispatchedContinuation( dispatcher = Dispachers.Main continuation = uiScopeParentBlock ).resumeCancellableWith(Result.success(Unit)) // первая дочерняя корутина val childCoroutine1 = StandaloneCoroutine(...) val uiScopeChild1Block = UiScopeChild1Block( completion = childCoroutine1 ) DispatchedContinuation( dispatcher = Dispachers.Main continuation = uiScopeChild1Block ).resumeCancellableWith(Result.success(Unit)) // вторая дочерняя корутина val childCoroutine2 = StandaloneCoroutine(...) val uiScopeChild1Block = UiScopeChild1Block( completion = childCoroutine2 ) DispatchedContinuation( dispatcher = Dispachers.Main continuation = uiScopeChild1Block ).resumeCancellableWith(Result.success(Unit))
Я не буду расписывать отдельные шаги, как это было в прошлом разделе, вы сами прекрасно можете справиться с этим, к тому же это будет хорошей практикой для закрепления знаний, вкратце основная суть:
-
Родительская корутина
StandaloneCoroutine
не будет завершена до тех пора пока все её дочерние корутины не выполнились. При создании новой корутины её объектJob
добавляется в качестве дочернего элемента в объектJob
родительской корутины, благодаря этому корутины могут отслеживать состояния своих детей. -
suspend блоки дочерних корутин
UiScopeChild1Block
иUiScopeChild2Block
будут обёрнуты вDispatchedContinuation
и переключены на главный поток черезHandler.post()
в не зависимости была ли родительская корутина изначательно на главном потоке или нет,Dispatchers.Main
в отличии отDispatchers.Main.immediate
всегда делает переключение. -
Объект
Continuation
родительской корутины никак не связан сContinuation
объектами дочерних корутин, поэтому когда завершатся последние результат не будет проброшен обратно вUiScopeParentBlock
, да и в этом особо нет смысла, как например сwithContext()
, который гарантирует последовательный порядок выполнения с возвращением результата. -
Диспатчеры в принципе не могут гарантировать порядок выполнения, так как выполняют код асинхронно, тот же метод
Handler.post()
из Android не даёт 100% уверенности, что код всегда будет выполняться в том порядке, который мы запланировали.
С запуском дочерних корутин через диспатчеры в целом разобрались, но что произойдёт если например у нас диспатчер Dispatchers.Main.immediate
и все корутины выполняются на главном потоке, дочерние корутины ведь не будут переключаться снова через Handler.post()
как это было с Dispatchers.Main
, в таком случае начинает работать так называемый EventLoop
:
// метод из DispatchedContinuation internal inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) if (dispatcher.isDispatchNeeded(context)) { // ... } else { /* executeUnconfined делает одну из двух вещей: 1) выполняет лямбду на EventLoop'е 2) ставит лямбду в очередь EventLoop'а */ executeUnconfined(state, MODE_CANCELLABLE) { continuation.resumeWith(result) } } } private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, block: () -> Unit ): Boolean { // EventLoop - штука куда кладутся лямбды в очередь на исполнение val eventLoop = ThreadLocalEventLoop.eventLoop // isUnconfinedLoopActive изначательно равен false поэтому для // родительской корутины срабатывает вторая ветка, а для дочерних - первая return if (eventLoop.isUnconfinedLoopActive) { _state = contState resumeMode = mode // выполнение дочерних корутин ставится в очередь EventLoop'а eventLoop.dispatchUnconfined(this) true } else { /* выполняет Continuation.resumeWith() для родительской корутины, дочерние в этот момент создаются и кладутся в очередь EventLoop'а, после завершения инициализация родительской корутины дочерние по очереди берутся из EventLoop'а и выполняются */ runUnconfinedEventLoop(eventLoop, block = block) false } }
Считайте что EventLoop
это простая очередь задач или лямбд как в нашем случае, куда кладутся запросы на выполнение дочерних корутин, затем они выполняются в том порядке, в котором были добавлены в очередь, как раз такая организация обеспечивает последовательное выполнение корутин:
/* viewModelScope под капотом использует Dispatchers.Main.immediate, который не будет переключать дочерние корутины, так как они и так находятся на главном потоке, поэтому будет задействован механизм EventLoop'а */ viewModelScope.launch { launch { println("I'm the second!") } launch { println("I'm the third!") } /* весь блок кода в родительской корутине выполняется до момента выполнения дочерних корутин, это необходимо чтобы поставить дочерние корутины в очередь, а потом начать их выполнять */ println("I'm the first!") }
В качестве дополнения приведу ещё парочку интересных особенностей EventLoop
‘а:
-
EventLoop
наследуется отCoroutineDispatcher
и может быть использован в корутинах, например так работаетrunBlocking()
-
EventLoop
создаётся для каждого потока чере механизмThreadLocal
переменных, как например экземплярLooper
класса из Android -
Под капотом
EventLoop
лежитArrayDeque
из Kotlin коллекций для формирования очереди задач.
Напоследок рассмотрим как runBlocking()
ждёт завершения своих корутин и соблюдает их порядок, хотя ответ очевиден — используется EventLoop
:
/* EventLoop создаётся на основе переданного CoroutineContext'а и кладётся в специальную корутину BlockingCoroutine, затем вызывается метод joinBlocking(), который ждёт пока все дочерние корутины не выполнятся */ actual fun <T> runBlocking( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T { val currentThread = Thread.currentThread() val eventLoop: EventLoop = ... val newContext: CoroutineContext = ... val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() } private class BlockingCoroutine<T>( parentContext: CoroutineContext, private val blockedThread: Thread, private val eventLoop: EventLoop? ) : AbstractCoroutine<T>(parentContext, true, true) { /* ожидание происходит в while(true) цикле, а любой бесконечный цикл как вы уже догадываетесь блокирует текущий поток, отсюда и название runBlocking чтобы завершить цикл ожидания нужно поменять isCompleted на false, это произойдет только когда все дочерние корутины завершатся */ fun joinBlocking(): T { registerTimeLoopThread() try { eventLoop?.incrementUseCount() try { while (true) { val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE if (isCompleted) break } } finally { // paranoia eventLoop?.decrementUseCount() } } finally { // paranoia unregisterTimeLoopThread() } val state = this.state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } return state as T } }
Именно благодаря бесконечному циклу runBlocking()
дожидается выполнения всех дочерних корутин, при этом блокируя текущий поток.
Хотелось бы добавить что порядок в runBlocking()
будет гарантироваться только когда корутины выполняются без каких либо асинхронных вызовов и переключений на другие диспатчеры, например здесь порядок не будет соблюдён:
fun main() = runBlocking<Unit> { launch { val result = withContext(Dispatchers.IO) { delay(500) "I'm the second!" } println(result) } launch { println("I'm the first!") } }
Заключение
Не думал что и сам дотяну до этого момента, статья получилась очень объёмной и без капли самоуверенности заявляю охренительно полезной!
В качестве последних слов я собрал парочку фактов:
-
Корутина — это всего лишь удобная абстракция c крутыми фишками над асинхронным выполнением кода, примером асинхронного выполнения может быть переключение на другой поток, использование очереди главного потока (MessageQueue из Android) и тд.
-
Continuation
— кирпичик на котором построена практически вся библиотека корутин, является простейшим интерфейсом с единственным методомresumeWith()
, данный метод вызывается когда происходит переход между состоянием приостановки корутины и состоянием её выполнения. -
Состояние приостановки — так как корутины позволяют писать асинхронный код в последовательном стиле, то необходим механизм возвращения к точкам выполнения этого кода, в большинстве случаев такой механизм реализуется с помощью callback’ов, которыми как раз и являются
Continuation
реализации. -
К реализациям
Continuation
интерфейса относятся: обычная корутинаStandaloneCoroutine
, сгенерированный suspend блок на базеContinuationImpl
, реализация для работы диспатчеровDispatchedContinuation
, корутина используемая вrunBlocking()
методе —BlockingCoroutine
и другие. -
DispatchedContinuation
оборачивает другиеContinuation
объекты, чтобы передать выполнениеContinuation.resumeWith()
метода диспатчеру. -
CoroutineDispatcher
в большинстве случаев используется для переключения корутины на другой или другие потоки, но и есть и исключения, такие какEventLoop
например, который позволяет выполнить корутины в правильном порядке. -
EventLoop
— это простая очередь задач, куда кладутся запросы на выполнение корутин, а затем они выполняются в том порядке, в котором были добавлены в очередь, такая организация обеспечивает правильный порядок выполнения, но это работает только если ни в одной корутине не будет переключений через диспатчеры.
Полезные ссылки:
Пишите в комментах ваше мнение и всем хорошего кода!
ссылка на оригинал статьи https://habr.com/ru/articles/827866/
Добавить комментарий