Kotlin Coroutines под капотом: отмена корутин

от автора

В прошлой статье я разобрал как работают корутины под капотом, рекомендую ознакомиться прежде чем закапываться дальше. В этой статье мы разберем такую сущность как Job и как на самом деле под капотом отменяются корутины, в общем будет интересно, погнали!

Знакомимся с Job и JobSupport

Черканем небольшой пример:

// parent coroutine launch {      // child coroutine 1     launch {          ...     }     // child coroutine 2     launch {          ...     } }.cancel()

По логике Structured Concurrency отмена родительской корутины должна привезти к отмене дочерних корутин (1 и 2), это одно из самых базовых правил на котором строится библиотека.

Провалимся в метод cancel() и узнаем кому он принадлежит:

interface Job : CoroutineContext.Element {     // добавляет дочернюю Job      fun attachChild(child: ChildJob): ChildHandle        // отменяет Job     fun cancel(cause: CancellationException? = null) }

Метод cancel() принадлежит такой сущности как Job, это один из базовых элементов CoroutineContext'а, отвечающий за одно из следующих состояний корутины:

  1. выполняется

  2. отменена

  3. завершена

  4. завершена с ошибкой

  5. еще не начала выполняться (если это lazy запуск корутины)

В принципе Job можно назвать конечным автоматом — есть какое-то начальное состояние, например «выполняется» и есть возможные переходы в другие, например при выбросе CancellationException корутина перейдет в состояние «отменена» и тд.

С основными определениями разобрались, теперь переходим к базовой реализации Job, она содержится в JobSupport, здесь происходит вся работа с состоянием, также этот класс наследует сама корутина:

abstract class AbstractCoroutine<in T>(     parentContext: CoroutineContext,     initParentJob: Boolean,     active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { ... }

Класс JobSupport достаточно большой, более 1000 строк кода, но нас пока интересует только реализация метода cancel():

// реализация содержится в JobSupport override fun cancel(cause: CancellationException?) {     cancelInternal(cause ?: defaultCancellationException()) }  // дополнительный метод cancelInternal() нужен для переопределения // логики отмены в других корутинах, например в ChannelCoroutine open fun cancelInternal(cause: Throwable) {     cancelImpl(cause) }  // для большинства корутин базовая логика отмены содержится тут internal fun cancelImpl(cause: Any?): Boolean {     var finalState: Any? = COMPLETING_ALREADY     if (onCancelComplete) {         // тут логика токо для CompletableDeferred корутин,          // в этой статье я не буду в них углубляться     }          if (finalState === COMPLETING_ALREADY) {         // нас интересует этот метод         finalState = makeCancelling(cause)     }     // на основе финального состояния возвращаем true, если отмена удалась      // или false если произошла какая-то фигня     return when {         finalState === COMPLETING_ALREADY -> true         finalState === COMPLETING_WAITING_CHILDREN -> true         finalState === TOO_LATE_TO_CANCEL -> false         else -> {             afterCompletion(finalState)             true         }     } }  // здесь происходит отмена корутины private fun makeCancelling(cause: Any?): Any? {     var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)     loopOnState { state ->         when (state) {             ...             // Incomplete это состояние выполнения корутины             is Incomplete -> {                 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }                 if (state.isActive) {                     // вызывается в случае отмены корутины, наш случай                     if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY                 } else {                     // вызывается в случае стандартного завершения корутины (не отмена)                     val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))                     when {                         finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")                         finalState === COMPLETING_RETRY -> return@loopOnState                         else -> return finalState                     }                 }             }             else -> return TOO_LATE_TO_CANCEL // already complete         }     } }

Прежде чем идти дальше хотел бы уточнить значения следующих констант:

  1. COMPLETING_ALREADY — корутина уже завершена.

  2. COMPLETING_WAITING_CHILDREN — корутина ожидает завершения детей.

  3. COMPLETING_RETRY — корутина не смогла успешно завершится и надо попробовать еще раз.

  4. TOO_LATE_TO_CANCEL — слишком поздно для отмены корутины.

Это вспомогательные состояния на основе которых метод cancelImpl определяет итоговый результат и выполняет завершающее действие (если корутина завершилась):

internal fun cancelImpl(cause: Any?): Boolean {     val finalState = ...        return when {         finalState === COMPLETING_ALREADY -> true         finalState === COMPLETING_WAITING_CHILDREN -> true         finalState === TOO_LATE_TO_CANCEL -> false         else -> {             afterCompletion(finalState)             true         }     } }

Важный момент: вспомогательные состояния не хранятся в Job'е, а вычисляются на лету при выполнении методов таких как cancel() например.

В следующем разделе рассмотрим какие состояния хранятся в Job'е и как они могут быть изменены.

Углубляемся в состояния JobSupport

Для начала приведу простой пример, показывающий зачем вообще нужно хранить состояние для корутины:

class Thread1 : Thread() {      private var isRunning = false      val isActive: Boolean         get() = isRunning          override fun run() {         isRunning = true         while (true) {             // выполняем цикл пока isRunning равен true             if (!isRunning) break             ...         }     }          fun cancel() {           // завершаем цикл         isRunning = false     }    }

Когда поток Thread1 будет отменен, логика внутри него завершится, состояние станет неактивным, на основе этого состояния можно:

  1. Узнать завершился поток или нет.

  2. Выполнить определенную логику для нового состояния (завершить цикл).

Аналогично для корутин, состояние позволяет узнать что происходит с корутиной в данный момент, а также выполнить какую-то логику в зависимости от этого состояния.

Посмотрим на самые часто используемые состояния в JobSupport:

// дополнительные состояния, которые вычисляются на лету (не хранятся в JobSupport) val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY") val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN") val COMPLETING_RETRY = Symbol("COMPLETING_RETRY") val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")  // при запуске корутины по умолчанию ей присваивается состояние EMPTY_ACTIVE val EMPTY_NEW = Empty(false) // для lazy корутин val EMPTY_ACTIVE = Empty(true) // для всех остальных корутин  // корутина в статусе выполнения у которой нет дочерних корутин class Empty(override val isActive: Boolean) : Incomplete { ... }  // практически тоже самое что и Empty, токо дополнительно добавляется // список CompletionHandler'ов, которые будут вызваны при завершении корутины class InactiveNodeList(     override val list: NodeList ) : Incomplete { ... }  // корутина в статусе выполнения у которой есть одна дочерняя корутина class ChildHandleNode(     val childJob: ChildJob ) : JobNode(), ChildHandle { ... }  // корутина в статусе выполнения у которой есть более  // чем одна дочерняя корутина class NodeList : LockFreeLinkedListHead(), Incomplete { ... }  // корутина в статусе отмены или завершения // это не финальное состояние, об этом еще будет полноценный раздел class Finishing(     override val list: NodeList,     isCompleting: Boolean,     rootCause: Throwable? ) : SynchronizedObject(), Incomplete { ... }  // корутина была полностью завершена по причине выброса исключения:  // 1) CancellationException, если корутина была отменена // 2) любые другие ошибки // это финальное состояние, об этом еще будет полноценный раздел class CompletedExceptionally(val cause: Throwable) { ... }  // JobSupport хранит состояние в atomic типе чтобы обезопасить  // корутину от ошибок многопоточности (race condition) val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)

В JobSupport'е хранится только актуальное состояние, им может быть одно из вышеперечисленных.

Если вы хотите в прикладном коде получить сведения о состоянии корутины, можете воспользоваться следующими Boolean полями:

// доступны в корутине, так как корутина наследует JobSupport class JobSupport : Job {      // выполняется корутина или нет     override val isActive: Boolean get() {         val state = this.state         return state is Incomplete && state.isActive     }      // завершена корутина полностью или нет (финальное состояние)     override val isCompleted: Boolean get() {         return state !is Incomplete     }      // отменена корутина или нет     override val isCancelled: Boolean get() {         val state = this.state         return state is CompletedExceptionally || (state is Finishing && state.isCancelling)     }    }

Думаю вы уже использовали одно из них, например вызывали Job.ensureActive() метод:

fun Job.ensureActive(): Unit {     if (!isActive) throw getCancellationException() }

С JobSupport состояниями более менее разобрались, можем снова возвращаться к логике:

internal fun cancelImpl(cause: Any?): Boolean {     ...     val finalState = makeCancelling(cause)     ... }  private fun makeCancelling(cause: Any?): Any? {     when (state) {         ...         is Incomplete -> tryMakeCancelling(state, causeException)     } }                

Я уже приводил код этих методов, можете вернуться в предыдущий раздел если его пропустили, главное понять цепочку вызовов, которая происходит при отмене:

cancel() → cancelInternal() → cancelImpl() → makeCancelling() → tryMakeCancelling()

По итогу вызывается метод JobSupport.tryMakeCancelling(), давайте провалимся туда:

private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {     assert { state !is Finishing } // only for non-finishing states     assert { state.isActive } // only for active states     // берем дочерние Job'ы из текущего состояния корутины     val list = getOrPromoteCancellingList(state) ?: return false     // создаем новое состояние - состояние отмены корутины     val cancelling = Finishing(list, false, rootCause)     // состояние обновляется через потокобезопасный алгоритм "Сравнение с обменом"     // у меня был пост на этот счет: https://t.me/android_under_the_hood/86      if (!_state.compareAndSet(state, cancelling)) return false     // пробрасываем дальше отмену     notifyCancelling(list, rootCause)     return true }

Давайте по порядку, при отмене корутины происходят следующие вещи:

  1. Получение дочерних Job (не забывайте что сама корутина наследует JobSupport и следовательно тоже является Job'ой).

  2. Создание нового состояния Finishing и его применение.

  3. Уведомление родителя и дочерних Job об отмене.

Разберемся подробнее с 1 и 3 пунктами.

Получение дочерних Job

Вспомним вот такой код в AbstractCoroutine:

// при создании новой корутины она добавляется в родительскую Job init {     if (initParentJob) initParentJob(parentContext[Job]) }  protected fun initParentJob(parent: Job?) {     assert { parentHandle == null }     if (parent == null) {         parentHandle = NonDisposableHandle         return     }     parent.start()      // parent.attachChild() добавляет в состояние parent'а      // ссылку на дочернюю Job'у     val handle = parent.attachChild(this)     // parentHandle нужен для отмены родительской корутины,     // еще вернемся к этому     parentHandle = handle     // если корутина была завершена то все зануляем     if (isCompleted) {         handle.dispose()         parentHandle = NonDisposableHandle // release it just in case, to aid GC     } }

Когда создается корутина происходят следующие вещи:

  1. Из CoroutineContext'а берется родительская Job'а и если она есть то добавляет корутину в качестве дочерней Job'ы.

  2. В корутине сохраняется ChildHandle, специальный обработчик который отменит родительскую Job'у если будет выброшена ошибка.

Таким образом между корутинами образуется связь «родитель — ребенок», давайте теперь узнаем как дочерние Job'ы хранятся в родительской и как они извлекаются оттуда:

// дочерние Job'ы берутся из текущего состояния корутины (JobSupport) fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:     when (state) {         is Empty -> NodeList()         is JobNode -> state.toSingleNodeList()         else -> error("State should have list: $state")     }  // при добавлении дочерней Job'ы у родительской меняется состояние fun attachChild(child: ChildJob): ChildHandle {     val node = ChildHandleNode(child)     node.parentJob = this     val state = _state.value     when (state) {         is Empty -> {              _state.value = node         }         is Incomplete -> {             _state.value = when (val list = state.list) {                 null -> node.toSingleNodeList()                 else -> list + node.toSingleNodeList()             }         }     } }

Я упростил JobSupport.attachChild() для понимания сути, если интересно узнать все подробности работы этого метода качайте исходники).

По итогу все сводится к единому состоянию JobSupport, где и хранятся дочерние Job'ы, какое состояние и в каком случае использовать определяется примерно по следующей логике:

  1. Если у корутины нет дочерних Job то она находится в состоянии Empty.

  2. Если у корутины есть одна дочерняя Job то она находится в состоянии JobNode (конкретный класс ChildHandleNode).

  3. Если у корутины несколько дочерних Job то она находится в состоянии NodeList, который хранит дочерние JobNode элементы в связанном списке.

Что ж теперь мы знаем как можно извлечь дочерние Job'ы и знаем о ссылке на родителя parentHandle, теперь остается только уведомить их об отмене.

Уведомление родителя и дочерних Job об отмене

Перед тем как зарываться в исходники важно понять что корутина может отмениться по двум разным причинам:

  1. Корутина может отменится штатно, через вызов Job.cancel() метода, в таком случае создастся / выбросится CancellationException, отменится текущая корутина и отменятся дочерние, в родительскую ничего не будет проброшено.

  2. Корутина может отменится внештатно, например вывалится любое исключение отличное от CancellationException, в таком случае отменится текущая корутина, отменятся дочерние и если у вас не указан SupervisorJob то отмена также будет прокинута родителю.

Посмотрим как это реализовано в коде:

private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {     ...     // уведомляем родителя и детей об отмене     // list это список Job, взятый из текущего состояния корутины (JobSupport)     notifyCancelling(list, rootCause)     return true }  private fun notifyCancelling(list: NodeList, cause: Throwable) {     ...     // прокидывание отмены в дочерние Job'ы     notifyHandlers(list, cause) { it.onCancelling }     // прокидывание отмены в родительскую Job'у     cancelParent(cause) }  // уведомление дочерних Job об отмене private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) {     var exception: Throwable? = null     list.forEach { node ->         // пробегаемся по дочерним Job'ам и отменяем их         // как мы уже знаем конкретным JobNode узлом для          // дочерних корутин является ChildHandleNode         if (node is JobNode && predicate(node)) {             try {                 node.invoke(cause)             } catch (ex: Throwable) {                 exception?.apply { addSuppressed(ex) } ?: run {                     exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)                 }             }         }     }     // если при оповещении дочерних JobNode узлов об отмене произошло      // какое-то исключение, то оно обработается в CoroutineExceptionHandler'е,      // реализация которого по умолчанию завершает программу со stacktrace'ом     exception?.let { handleOnCompletionException(it) } }  // уведомление родительской Job'ы об отмене private fun cancelParent(cause: Throwable): Boolean {     // isScopedCoroutine равен true только для корутин, созданных      // через coroutineScope { ... } билдер, такие корутины работают      // как suspend блок: пробрасывают исключение через конструкцию      // throw exception, поэтому нет необходимости обращаться к      // родительской корутине через parentHandle     if (isScopedCoroutine) return true      // простая проверка: штатная отмена или нет     val isCancellation = cause is CancellationException     val parent = parentHandle     // если родителя нет, ничего с ним делать не надо     if (parent === null || parent === NonDisposableHandle) {         return isCancellation     }      // оповещаем родительскую Job'у об отмене     return parent.childCancelled(cause) || isCancellation }

В конечном итоге будут вызваны:

  1. ChildHandleNode.invoke() для оповещения дочерних Job об отмене — дочерние ChildHandleNode узлы хранятся в текущем состоянии JobSupport.

  2. ChildHandleNode.childCancelled() для оповещения родительской Job'ы об отмене — родительский ChildHandleNode узел хранится в поле parentHandle, которое принадлежит классу JobSupport.

Рассмотрим поближе класс ChildHandleNode:

class ChildHandleNode(     @JvmField val childJob: ChildJob ) : JobNode(), ChildHandle {     override val parent: Job get() = job     override val onCancelling: Boolean get() = true     // при вызове invoke() отменяется дочерняя Job'а     override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)     // при вызове childCancelled() отменяется родительская Job'а     override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) }  open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob {      // вызывается дочерней Job'ой для отмены родительской     open fun childCancelled(cause: Throwable): Boolean {         // родительская Job'а отменяется только в случае внештатной отмены         if (cause is CancellationException) return true         return cancelImpl(cause) && handlesException     }      // вызывается родительской Job'ой для отмены дочерних     override fun parentCancelled(parentJob: ParentJob) {         // дочерняя Job'а отменяется всегда когда отменяется родительская         cancelImpl(parentJob)     }  }  // класс JobImpl наследует JobSupport class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {      // в SupervisorJob родительская Job'а никогда не отменяется     override fun childCancelled(cause: Throwable): Boolean = false    }

Возвращаясь к двум возможным вариантам отмены корутины, приведенных в начале раздела, можем сделать следующие выводы:

  1. Штатная отмена: отмена корутины приведет к вызову JobSupport.cancelImpl() у дочерних Job, которые будут прокидывать отмену все ниже и ниже по дереву пока не отменятся все дочерние корутины, по итогу отмененные корутины будут находиться в состоянии Finishing — то есть в состоянии отмены.

  2. Внештатная отмена: отмена корутины приведет в вызову JobSupport.cancelImpl() у дочерних Job, которые отменятся аналогично штатной отмене, в добавок если вы не указали SupervisorJob отмена будет прокидываться родителям все выше и выше по дереву пока не дойдет до самого главного, где исключение будет прокинуто в CoroutineExceptionHandler (по умолчанию установлен CoroutineExceptionHandler, который крашит программу со stacktrace’ом).

Вроде бы разобрались с отменой, но как будто чего-то не хватает, да мы сбросили состояние корутины на Finishing, но не разобрались как происходит фактическая отмена определенной работы, например как отменяется функция delay() или как отменяются suspend функции, что ж пора и в этом разобраться.

Как отменяется функция delay()

Освежу память простейшим примером:

val job = launch {     // должен отменится через 200 миллисекунд     delay(1_000) } delay(200) // отменяем корутину вместе с delay() вызовом job.cancel()

В прошлых разделах мы узнали что в самой корутине нет никакой фактической отмены логики, а только управление состоянием и проброс отмены родительской / дочерним корутинам, поэтому лезем в исходники delay() функции:

suspend fun delay(timeMillis: Long) {     if (timeMillis <= 0) return     // приостанавливает корутину до тех пор пока      // не вызовется Continuation.resumeWith(), это произойдет     // спустя некоторое время timeMillis, так в принципе и работает delay()     return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->         // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.         if (timeMillis < Long.MAX_VALUE) {             cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)         }     } }  // реализация delay() из Android диспатчера HandlerContext,  // который работает на главном потоке приложения override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {     val block = Runnable {         with(continuation) { resumeUndispatched(Unit) }     }     // ставим задачу в очередь главного потока, второй параметр     // определяет через сколько миллисекунд она будет выполнена     if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {         // CancellableContinuation позволяет добавить обработчик,         // который будет вызван при отмене корутины,          // в нашем случае это нужно для удаления задачи из очереди         continuation.invokeOnCancellation { handler.removeCallbacks(block) }     } }

Осталось только узнать кто вызывает обработчик отмены, добавленный через CancellableContinuation.invokeOnCancellation() метод. Чтобы разобраться в этом провалимся в функцию suspendCancellableCoroutine:

suspend inline fun <T> suspendCancellableCoroutine(     crossinline block: (CancellableContinuation<T>) -> Unit ): T =     suspendCoroutineUninterceptedOrReturn { uCont ->         // создается отменяемый Continuation и в него оборачивается текущий,         // чтобы в дальнейшем вернуть результат обратно         val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)         // тут происходит добавление CancellableContinuationImpl в качестве         // дочернего JobNode узла для текущей корутины         cancellable.initCancellability()         block(cancellable)         // если можем сразу вернуть результат возвращаем его          // или приостанавливаем корутину если это невозможно         cancellable.getResult()     }  // метод из CancellableContinuationImpl fun initCancellability() {     val handle = installParentHandle() ?: return      ... }  // метод из CancellableContinuationImpl private fun installParentHandle(): DisposableHandle? {     // берем родительскую Job'у, ей является корутина     val parent = context[Job] ?: return null     // добавляем в состояние родительской Job'ы новый узел ChildContinuation     val handle = parent.invokeOnCompletion(handler = ChildContinuation(this))     _parentHandle.compareAndSet(null, handle)     return handle }  private class ChildContinuation(     @JvmField val child: CancellableContinuationImpl<*> ) : JobNode() {     override val onCancelling get() = true      // этот метод будет вызван когда родительская Job'а     // начнет отменяться, вспомните как корутина уведомляет      // об отмене родителей и детей     override fun invoke(cause: Throwable?) {         // отменяется CancellableContinuationImpl         child.parentCancelled(child.getContinuationCancellationCause(job))     } }

Как вы уже знаете при отмене корутины возьмутся все дочерние JobNode узлы из ее текущего состояния и у каждого узла будет вызван метод JobNode.invoke(), в данном случае ChildContinuation является абсолютно таким же узлом, как и ChildHandleNode:

class ChildContinuation(val child: CancellableContinuationImpl<*>) : JobNode() {     override fun invoke(cause: Throwable?) {         // при вызове JobNode.invoke() у CancellationContinuation будет         // вызван метод parentCancelled()         child.parentCancelled(child.getContinuationCancellationCause(job))     } }  // метод из CancellableContinuationImpl internal fun parentCancelled(cause: Throwable) {     if (cancelLater(cause)) return     // по итогу произойдет обычная отмена через      // CancellableContinuationImpl.cancel() метод     cancel(cause)     // Even if cancellation has failed, we should detach child to avoid potential leak     detachChildIfNonResuable() }

Ну и собственно как происходит отмена обработчиков в CancellableContinuationImpl:

// метод из CancellableContinuationImpl override fun cancel(cause: Throwable?): Boolean {     _state.loop { state ->         if (state !is NotCompleted) return false // false if already complete or cancelling         // состояние меняется на отмененное         val update = CancelledContinuation(this, cause, handled = state is CancelHandler || state is Segment<*>)         if (!_state.compareAndSet(state, update)) return@loop         // вызывает обработчик отмены, зарегистрированный через          // invokeOnCancellation() метод         when (state) {             is CancelHandler -> callCancelHandler(state, cause)             is Segment<*> -> callSegmentOnCancellation(state, cause)         }         ...         return true     } }  override fun invokeOnCancellation(handler: CompletionHandler) =     // добавляет в CancellableContinuationImpl обработчик,     // вызываемый при отмене корутины     invokeOnCancellation(CancelHandler.UserSupplied(handler))

Суммируем как происходит отмена функции delay():

  1. Сначала отменяется сама корутина — извлекаются дочерние JobNode узлы из текущего состояния и оно меняется на Finishing.

  2. Для каждого дочернего JobNode узла вызывается JobNode.invoke() метод — JobNode узлы содержат ссылки не только на корутиновские Job'ы, но и на CancellableContinuation сущности, CancellableContinuation используется в функции delay().

  3. При вызове JobNode.invoke() для CancellableContinuation, будут вызваны все обработчики, добавленные через CancellableContinuation.invokeOnCancellation() метод, в том числе и обработчик добавленный в функции delay().

  4. Обработчик в функции delay() отменит задержку — в Android реализации для этого используется Handler.removeCallbacks() метод, он удалит задачу из очереди главного потока.

Как отменяются suspend функции

Важно понимать что не во всех ситуациях suspend функции могут быть отменены, например здесь это невозможно:

suspend fun sleep() {     Thread.sleep(500) }  suspend fun welcome() {     println("Welcome!") }  val job = launch(Dispatchers.IO) {     sleep()     welcome() }  delay(100)  // ничего не отменится, выполнится sleep() вместе с welcome() job.cancel()

Так происходит потому что ни одна из suspend функций не возвращает состояния приостановки и следовательно код в suspend блоке выполняется синхронно. Если вы уже забыли из прошлой статьи, напоминаю что для каждого suspend блока, запущенного в корутине будет сгенерирован специальный switch case, в нашем случае он будет вот таким:

class GeneratedContinuationImplementation(...) : ... {      override fun resumeWith(result: Result<Any?>) {         val outcome: Result<Any?> =             try {                 // при хорошо спроектированных suspend функциях,                 // каждый вызов должен приводить к COROUTINE_SUSPENDED состоянию                 // после того как функция завершит свое выполнение она вызовет                  // Continuation.resumeWith() этого Continuation'а и выполнится                  // следующая suspend функция в switch case блоке                 val outcome = invokeSuspend(param)                 if (outcome === COROUTINE_SUSPENDED) return                 Result.success(outcome)             } catch (exception: Throwable) {                 Result.failure(exception)         }         // после завершения suspend блока должна завершится сама корутина         coroutine.resumeWith(outcome)     }      fun sleep(cont: Continuation<Any?>): Any {         Thread.sleep(500)         // нет состояния приостановки COROUTINE_SUSPENDED,         // код выполняется синхронно         return Unit     }      fun welcome(cont: Continuation<Any?>): Any {         // тут также нет состояния приостановки         println("Welcome!")         return Unit     }      // я убрал switch case для простоты, логика абсолютно такая же     override fun invokeSuspend(result: Result<Any?>): Any {         // так как нет состояния приостановки все методы          // выполняются друг за другом         sleep(this)         welcome(this)         // завершение suspend блока         return result     }  }

Обязательно вернитесь к прошлой статье если ничего непонятно по suspend блоку, там подробно это разжевано, ну а мы попробуем починить этот код чтобы он стал отменяемым:

suspend fun sleep() {     // я только добавил withContext вызов     withContext(Dispatchers.IO) {         Thread.sleep(500)     } }  suspend fun welcome() {     println("Welcome!") }  val job = launch {     sleep()     welcome() }  delay(100)  // выполнится только sleep(), а welcome() будет отменен job.cancel()

Вуаля! Мы добавили withContext() вызов, который под капотом создает DispatchedCoroutine, где выполняется suspend блок с Thread.sleep() вызовом, смотрим как это будет работать:

class GeneratedContinuationImplementation(...) : ... {      override fun resumeWith(result: Result<Any?>) {         val outcome: Result<Any?> =             try {                 // при хорошо спроектированных suspend функциях,                 // каждый вызов должен приводить к COROUTINE_SUSPENDED состоянию                 // после того как функция завершит свое выполнение она вызовет                  // Continuation.resumeWith() этого Continuation'а и выполнится                  // следующая suspend функция в switch case блоке                 val outcome = invokeSuspend(param)                 if (outcome === COROUTINE_SUSPENDED) return                 Result.success(outcome)             } catch (exception: Throwable) {                 // вот тут обрабатывается CancellationException,                 // поэтому если вы напишите свой try / catch в suspend блоке                 // и перехватите это исключение, у вас может сломаться                  // отмена корутины                 Result.failure(exception)         }         // после завершения suspend блока должна завершится сама корутина         coroutine.resumeWith(outcome)     }      fun sleep(cont: Continuation<Any?>): Any {         // создаем DispatchedCoroutine, под капотом она добавится         // как дочерняя Job'а в родительскую корутину         val coroutine = DispatchedCoroutine(...)         // для простоты не стал расписывать логику диспатчеризации,          // она есть в прошлой статье          coroutine.runSuspendBlockInIOThread {             // выполняем какую-то работу на IO потоке             Thread.sleep(500)             // также упростил логику возвращения на предыдущий поток             coroutine.runResumeInPreviousThread {                 // выходим из состояния приостановки                 // result вычисляется на основе текущего состояния                  // DispatchedCoroutine                   cont.resumeWith(result)             }         }         // до вызова cont.resumeWith() метод будет находиться          // в состоянии приостановки         return COROUTINE_SUSPENDED     }      fun welcome(cont: Continuation<Any?>): Any {         // эта функция остается без изменений         println("Welcome!")         return Unit     }      var label = 0        override fun invokeSuspend(result: Result<Any?>): Any {         when(label) {             0 -> {                 result.throwOnFailure()                 sleep(this)                 return COROUTINE_SUSPENDED             }             1 -> {                 // result может содержать исключение CancellationException,                 // которое должно будет прокинуться выше и отменить выполнение                 // suspend блока                 result.throwOnFailure()                 welcome()             }         }         return result     }  }

В чем собственно заключается отмена? Да все просто, любая suspend функция, которая хочет отменить дальнейшее выполнение других функций в suspend блоке должна вернуть результат с исключением:

fun welcome(continuation: Continuation<Any>): Any {     continuation.resumeWith(Result.failure(CancellationException("Отмени suspend блок!")))     return COROUTINE_SUSPENDED }

Как мы уже выяснили такое возможно только если такая функция находится в состоянии приостановки, в противном случае suspend блок продолжит выполняться.

После того как Continuation, сгенерированный для suspend блока, получил результат с исключением, он выбросит это исключение через вызов Result.throwOnFailure() и завершит выполнение всего switch case блока.

Остается только найти место где возвращается результат с исключением, в нашем случае это находится вот тут:

fun sleep(cont: Continuation<Any?>): Any {     val coroutine = DispatchedCoroutine(...)     coroutine.runSuspendBlockInIOThread {         Thread.sleep(500)         coroutine.runResumeInPreviousThread {             // result берется из текущего состояния DispatchedCoroutine             // и он как раз содержит CancellationException, когда корутина             // была отменена             cont.resumeWith(result)         }     }       return COROUTINE_SUSPENDED }  val job = launch {     // в sleep() создается DispatchedCoroutine и добавляется      // как дочерний узел ChildHandleNode в текущую корутину     sleep() }  delay(100)  // при отмене корутины как вы уже знаете все дочерние узлы ChildHandleNode // тоже буду отменяться, следовательно DispatchedCoroutine перейдет в состояние // Finishing, содержащее CancellationException, это исключение  // и пробросится в сгенерированный Continuation для suspend блока job.cancel()

Вот таким нехитрым способом происходит отмена suspend функций, хотя скорее suspend блока, не забывайте что отмениться может только запланированный код (следующие suspend функции) и только в случае состояния приостановки текущей.

Получается, что никакой магии нет.

Finishing это не последнее состояние корутины?

Из предыдущего раздела мы увидели что после отмены корутины, отменится дальнейшее выполнение suspend блока, а результат вернется обратно в корутину:

class GeneratedContinuationImplementation(...) : ... {      override fun resumeWith(result: Result<Any?>) {         val outcome: Result<Any?> = ...         // возвращаем результат обратно в корутину         coroutine.resumeWith(outcome)     }      ... }

Это сделано для того чтобы корутина выполнила завершающие действия и перешла в конечное состояние:

override fun resumeWith(result: Result<T>) {     val state = makeCompletingOnce(result.toState())     if (state === COMPLETING_WAITING_CHILDREN) return     afterResume(state) }  fun makeCompletingOnce(proposedUpdate: Any?): Any? {     loopOnState { state ->         val finalState = tryMakeCompleting(state, proposedUpdate)         ...     } }  // я упростил логику до одного tryMakeCompleting() метода, // если хотите досконально разобраться во всех нюансах  // читайте исходники JobSupport.tryMakeCompleting() fun tryMakeCompleting(state: Any?, result: Any?) {     if (state !is Incomplete) return COMPLETING_ALREADY      // получаем дочерние Job'ы     val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY      // получаем состояние завершения или создаем новое для отмены дочерних узлов     val finishing = state as? Finishing ?: Finishing(list, false, null)      // тут извлекается искючение либо из текущего состояния корутины state      // либо из пришедшего результата result, его может не быть если     // корутина завершается сама без отмен и ошибок     val notifyRootCause: Throwable? = ...      // если пришло исключение отмены или внештатная ошибка      // нужно отменить дочерние Job'ы      notifyRootCause?.let { notifyCancelling(list, it) }      // текущая корутина будет ждать пока выполняться / отменятся дочерние корутины     val child = list.nextChild()     if (child != null && tryWaitForChild(finishing, child, proposedUpdate))         return COMPLETING_WAITING_CHILDREN      // вычисляются финальное исключение и финальное состояние,      // обычно если корутина завершается штатно конечным состоянием будет:     // Unit для launch корутин,      // Any для withContext корутин,      // если с ошибкой / отменой то CompletedExceptionally     val finalException = ...     val finalState = ...             if (finalException != null) {         // для отмены cancelParent() всегда возращает true,         // в случае          // внештатного исключения оно будет прокидываться до главного          // родителя, где вызовется метод handleJobException() и         // исключение обработается через CoroutineExceptionHandler:         // по умолчанию установлен CoroutineExceptionHandler,         // который крашит программу со stacktrace'ом         val handled = cancelParent(finalException) || handleJobException(finalException)         if (handled) (finalState as CompletedExceptionally).makeHandled()     }      // устанавливаем финальное состояние     _state.compareAndSet(state, finalState.boxIncomplete())            return finalState }

В итоге после завершения корутины у нее будет одно из следующих состояний:

  1. Unit — для штатного завершения корутин, запущенных через launch билдер (StandaloneCoroutine).

  2. CompletedExceptionally — для отмененной корутины или завершенной внештатно (исключение отличное от CancellationException).

  3. Любой другой тип — для корутин, запущенных через withContext билдер (DispatchedCorotuine).

С финальным состоянием разобрались, теперь посмотрим как родительская корутина дожидается завершения своих детей, предположим у нас есть следующий код:

// корутина 1 launch {     // корутина 2     launch(Dispatchers.IO) {         ...     }     // корутина 3     launch(Dispatchers.IO) {         ...     } }

Не секрет что запуск корутин 2 и 3 произойдет без приостановки так как тут не надо дожидаться результата, а это значит что suspend блок корутины 1 сразу вернет управление через вызов Continuation.resumeWith(), где как мы уже выяснили корутина должна будет завершится.

Чтобы корутина 1 дождалась детей, есть специальная логика:

fun tryMakeCompleting(state: Any?, result: Any?) {     ...        // текущая корутина будет ждать пока выполняться / отменятся дочерние корутины     val child = list.nextChild()     if (child != null && tryWaitForChild(finishing, child, proposedUpdate))         return COMPLETING_WAITING_CHILDREN      ...        // только после завершения детей корутина поменяет свое состояние      // на полностью завершенное     _state.compareAndSet(state, finalState.boxIncomplete())      ... }  private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {     // в дочернюю Job'у будет добавлен специальный CompletionHandler,     // который вызовется когда она будет завершаться     val handle = child.childJob.invokeOnCompletion(         invokeImmediately = false,         handler = ChildCompletion(this, state, child, proposedUpdate)     )     // если дочерняя Job'а еще не завершена ждем     if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it     // иначе смотрим следующую дочернюю Job'у     val nextChild = child.nextChild() ?: return false     return tryWaitForChild(state, nextChild, proposedUpdate) }  private class ChildCompletion(     private val parent: JobSupport,     private val state: Finishing,     private val child: ChildHandleNode,     private val proposedUpdate: Any? ) : JobNode() {     override val onCancelling get() = false     // когда дочерняя Job'а будет завершаться она вызовет этот обработчик     override fun invoke(cause: Throwable?) {         // родительская корутина продолжит завершаться          parent.continueCompleting(state, child, proposedUpdate)     } }

Родительская корутина будет ждать завершения детей и пока последняя дочерняя корутина не вызовет ChildCompletion.invoke() обработчик, она не перейдет в финальное состояние.

P.S. Спасибо чуваку @manwtein что упомянул в комментах про ожидание дочерних корутин в родителе!

Итоги

Ух, думал не выдержу до конца этого длинного текста, но чем заниматься в Новогодние праздники если не писать суперсложную техническую статью?).

Давайте подведем итоги:

  1. Job — элемент CoroutineContext'а, отвечающий за хранение одного из состояний корутины: выполнение, отмена, завершение, завершение с ошибкой, для lazy корутин есть еще состояние «не начала выполняться».

  2. В Job есть специальные Boolean поля, которые могут дать информацию о текущем состоянии корутины: isActive — выполняется корутина или нет, isCompleted — завершена корутина полностью или нет, isCancelled — отменена корутина или нет. 

  3. JobSupport — это одна из базовых реализаций Job'ы, от которой наследуются другие реализации такие как SupervisorJob например.

  4. JobSupport может содержать ссылку на родительскую Job'у, если у корутины есть родитель, это нужно для пробрасывания отмены родителю в случае внештатной отмены.

  5. JobSupport состояние хранит в себе дочерние Job’ы по следующей логике: если нет дочерних Job, хранится Empty состояние; если есть одна дочерняя Job'а хранится JobNode узел, конкретный класс — ChildHandleNode; если несколько дочерних Job хранится состояние NodeList, где дочерние JobNode элементы лежат в связанном списке.

  6. Класс корутины StandaloneCoroutine является наследником JobSupport, поэтому вся логика из JobSupport доступна корутине.

  7. Отмена корутины может быть штатной, например при вызове Job.cancel(), в таком случае отменятся только дочерние корутины, отмена не будет пробрасываться родителю.

  8. Отмена корутины может быть внештатной, например при выбросе любого исключения отличного от CancellationException, в таком случае отменятся дочерние корутины и отмена будет проброшена родителю, который может либо не пробрасывать дальше отмену (если это SupervisorJob), либо пробросить ее до самого главного родителя, где исключение будет обработано CoroutineExceptionHandler'ом.

  9. По умолчанию установлен CoroutineExceptionHandler, который крашит программу со stacktrace’ом.

  10. Фактическая отмена логики осуществляется не в самой корутине, а в правильно организованных suspend функциях таких как delay() или в suspend блоках, содержащих такие функции.

  11. Функция delay() под капотом использует CancellableContinuation, который добавляется в родительскую корутину как дочерняя Job'а и при отмене корутины аналогично дочерним корутинам отменяется. Самая главная фишка CancellableContinuation в том что при отмене он вызывает специальные обработчики, их можно добавить через метод CancellableContinuation.invokeOnCancellation(), в функции delay() такой обработчик отменяет задержку.

  12. Отменить можно только тот suspend блок где suspend функции могут быть в приостановленном состоянии, в противном случае код выполнится как один большой метод.

  13. После отмены или завершения корутины она переходит в одно из финальных состояний: Unit — для штатного завершения корутин, запущенных через launch билдер (StandaloneCoroutine), CompletedExceptionally — для отмененной корутины или завершенной внештатно, любой другой тип — для корутин, запущенных через withContext билдер (DispatchedCorotuine).

Полезные ссылки:

  1. Мой технический телеграм канал

  2. Мой личный блог обо всем

  3. Предыдущая статья по корутинам

  4. Курс по корутинам от Android Broadcast

  5. Официальная дока

  6. Крутой доклад от создателя библиотеки

  7. Исходный код корутин

Пишите в комментах ваше мнение и всем хорошего кода!


ссылка на оригинал статьи https://habr.com/ru/articles/873076/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *