В прошлой статье я разобрал как работают корутины под капотом, рекомендую ознакомиться прежде чем закапываться дальше. В этой статье мы разберем такую сущность как Job и как на самом деле под капотом отменяются корутины, в общем будет интересно, погнали!
Знакомимся с Job и JobSupport
Черканем небольшой пример:
// parent coroutine launch { // child coroutine 1 launch { ... } // child coroutine 2 launch { ... } }.cancel()
По логике Structured Concurrency отмена родительской корутины должна привезти к отмене дочерних корутин (1 и 2), это одно из самых базовых правил на котором строится библиотека.
Провалимся в метод cancel()
и узнаем кому он принадлежит:
interface Job : CoroutineContext.Element { // добавляет дочернюю Job fun attachChild(child: ChildJob): ChildHandle // отменяет Job fun cancel(cause: CancellationException? = null) }
Метод cancel()
принадлежит такой сущности как Job
, это один из базовых элементов CoroutineContext'а
, отвечающий за одно из следующих состояний корутины:
-
выполняется
-
отменена
-
завершена
-
завершена с ошибкой
-
еще не начала выполняться (если это lazy запуск корутины)
В принципе Job
можно назвать конечным автоматом — есть какое-то начальное состояние, например «выполняется» и есть возможные переходы в другие, например при выбросе CancellationException
корутина перейдет в состояние «отменена» и тд.
С основными определениями разобрались, теперь переходим к базовой реализации Job
, она содержится в JobSupport
, здесь происходит вся работа с состоянием, также этот класс наследует сама корутина:
abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { ... }
Класс JobSupport
достаточно большой, более 1000 строк кода, но нас пока интересует только реализация метода cancel()
:
// реализация содержится в JobSupport override fun cancel(cause: CancellationException?) { cancelInternal(cause ?: defaultCancellationException()) } // дополнительный метод cancelInternal() нужен для переопределения // логики отмены в других корутинах, например в ChannelCoroutine open fun cancelInternal(cause: Throwable) { cancelImpl(cause) } // для большинства корутин базовая логика отмены содержится тут internal fun cancelImpl(cause: Any?): Boolean { var finalState: Any? = COMPLETING_ALREADY if (onCancelComplete) { // тут логика токо для CompletableDeferred корутин, // в этой статье я не буду в них углубляться } if (finalState === COMPLETING_ALREADY) { // нас интересует этот метод finalState = makeCancelling(cause) } // на основе финального состояния возвращаем true, если отмена удалась // или false если произошла какая-то фигня return when { finalState === COMPLETING_ALREADY -> true finalState === COMPLETING_WAITING_CHILDREN -> true finalState === TOO_LATE_TO_CANCEL -> false else -> { afterCompletion(finalState) true } } } // здесь происходит отмена корутины private fun makeCancelling(cause: Any?): Any? { var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause) loopOnState { state -> when (state) { ... // Incomplete это состояние выполнения корутины is Incomplete -> { val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it } if (state.isActive) { // вызывается в случае отмены корутины, наш случай if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY } else { // вызывается в случае стандартного завершения корутины (не отмена) val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException)) when { finalState === COMPLETING_ALREADY -> error("Cannot happen in $state") finalState === COMPLETING_RETRY -> return@loopOnState else -> return finalState } } } else -> return TOO_LATE_TO_CANCEL // already complete } } }
Прежде чем идти дальше хотел бы уточнить значения следующих констант:
-
COMPLETING_ALREADY
— корутина уже завершена. -
COMPLETING_WAITING_CHILDREN
— корутина ожидает завершения детей. -
COMPLETING_RETRY
— корутина не смогла успешно завершится и надо попробовать еще раз. -
TOO_LATE_TO_CANCEL
— слишком поздно для отмены корутины.
Это вспомогательные состояния на основе которых метод cancelImpl
определяет итоговый результат и выполняет завершающее действие (если корутина завершилась):
internal fun cancelImpl(cause: Any?): Boolean { val finalState = ... return when { finalState === COMPLETING_ALREADY -> true finalState === COMPLETING_WAITING_CHILDREN -> true finalState === TOO_LATE_TO_CANCEL -> false else -> { afterCompletion(finalState) true } } }
Важный момент: вспомогательные состояния не хранятся в Job'е
, а вычисляются на лету при выполнении методов таких как cancel()
например.
В следующем разделе рассмотрим какие состояния хранятся в Job'е
и как они могут быть изменены.
Углубляемся в состояния JobSupport
Для начала приведу простой пример, показывающий зачем вообще нужно хранить состояние для корутины:
class Thread1 : Thread() { private var isRunning = false val isActive: Boolean get() = isRunning override fun run() { isRunning = true while (true) { // выполняем цикл пока isRunning равен true if (!isRunning) break ... } } fun cancel() { // завершаем цикл isRunning = false } }
Когда поток Thread1
будет отменен, логика внутри него завершится, состояние станет неактивным, на основе этого состояния можно:
-
Узнать завершился поток или нет.
-
Выполнить определенную логику для нового состояния (завершить цикл).
Аналогично для корутин, состояние позволяет узнать что происходит с корутиной в данный момент, а также выполнить какую-то логику в зависимости от этого состояния.
Посмотрим на самые часто используемые состояния в JobSupport
:
// дополнительные состояния, которые вычисляются на лету (не хранятся в JobSupport) val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY") val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN") val COMPLETING_RETRY = Symbol("COMPLETING_RETRY") val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL") // при запуске корутины по умолчанию ей присваивается состояние EMPTY_ACTIVE val EMPTY_NEW = Empty(false) // для lazy корутин val EMPTY_ACTIVE = Empty(true) // для всех остальных корутин // корутина в статусе выполнения у которой нет дочерних корутин class Empty(override val isActive: Boolean) : Incomplete { ... } // практически тоже самое что и Empty, токо дополнительно добавляется // список CompletionHandler'ов, которые будут вызваны при завершении корутины class InactiveNodeList( override val list: NodeList ) : Incomplete { ... } // корутина в статусе выполнения у которой есть одна дочерняя корутина class ChildHandleNode( val childJob: ChildJob ) : JobNode(), ChildHandle { ... } // корутина в статусе выполнения у которой есть более // чем одна дочерняя корутина class NodeList : LockFreeLinkedListHead(), Incomplete { ... } // корутина в статусе отмены или завершения // это не финальное состояние, об этом еще будет полноценный раздел class Finishing( override val list: NodeList, isCompleting: Boolean, rootCause: Throwable? ) : SynchronizedObject(), Incomplete { ... } // корутина была полностью завершена по причине выброса исключения: // 1) CancellationException, если корутина была отменена // 2) любые другие ошибки // это финальное состояние, об этом еще будет полноценный раздел class CompletedExceptionally(val cause: Throwable) { ... } // JobSupport хранит состояние в atomic типе чтобы обезопасить // корутину от ошибок многопоточности (race condition) val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
В JobSupport'е
хранится только актуальное состояние, им может быть одно из вышеперечисленных.
Если вы хотите в прикладном коде получить сведения о состоянии корутины, можете воспользоваться следующими Boolean полями:
// доступны в корутине, так как корутина наследует JobSupport class JobSupport : Job { // выполняется корутина или нет override val isActive: Boolean get() { val state = this.state return state is Incomplete && state.isActive } // завершена корутина полностью или нет (финальное состояние) override val isCompleted: Boolean get() { return state !is Incomplete } // отменена корутина или нет override val isCancelled: Boolean get() { val state = this.state return state is CompletedExceptionally || (state is Finishing && state.isCancelling) } }
Думаю вы уже использовали одно из них, например вызывали Job.ensureActive()
метод:
fun Job.ensureActive(): Unit { if (!isActive) throw getCancellationException() }
С JobSupport
состояниями более менее разобрались, можем снова возвращаться к логике:
internal fun cancelImpl(cause: Any?): Boolean { ... val finalState = makeCancelling(cause) ... } private fun makeCancelling(cause: Any?): Any? { when (state) { ... is Incomplete -> tryMakeCancelling(state, causeException) } }
Я уже приводил код этих методов, можете вернуться в предыдущий раздел если его пропустили, главное понять цепочку вызовов, которая происходит при отмене:
cancel() → cancelInternal() → cancelImpl() → makeCancelling() → tryMakeCancelling()
По итогу вызывается метод JobSupport.tryMakeCancelling()
, давайте провалимся туда:
private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean { assert { state !is Finishing } // only for non-finishing states assert { state.isActive } // only for active states // берем дочерние Job'ы из текущего состояния корутины val list = getOrPromoteCancellingList(state) ?: return false // создаем новое состояние - состояние отмены корутины val cancelling = Finishing(list, false, rootCause) // состояние обновляется через потокобезопасный алгоритм "Сравнение с обменом" // у меня был пост на этот счет: https://t.me/android_under_the_hood/86 if (!_state.compareAndSet(state, cancelling)) return false // пробрасываем дальше отмену notifyCancelling(list, rootCause) return true }
Давайте по порядку, при отмене корутины происходят следующие вещи:
-
Получение дочерних
Job
(не забывайте что сама корутина наследуетJobSupport
и следовательно тоже являетсяJob'ой
). -
Создание нового состояния
Finishing
и его применение. -
Уведомление родителя и дочерних
Job
об отмене.
Разберемся подробнее с 1 и 3 пунктами.
Получение дочерних Job
Вспомним вот такой код в AbstractCoroutine
:
// при создании новой корутины она добавляется в родительскую Job init { if (initParentJob) initParentJob(parentContext[Job]) } protected fun initParentJob(parent: Job?) { assert { parentHandle == null } if (parent == null) { parentHandle = NonDisposableHandle return } parent.start() // parent.attachChild() добавляет в состояние parent'а // ссылку на дочернюю Job'у val handle = parent.attachChild(this) // parentHandle нужен для отмены родительской корутины, // еще вернемся к этому parentHandle = handle // если корутина была завершена то все зануляем if (isCompleted) { handle.dispose() parentHandle = NonDisposableHandle // release it just in case, to aid GC } }
Когда создается корутина происходят следующие вещи:
-
Из
CoroutineContext'а
берется родительскаяJob'а
и если она есть то добавляет корутину в качестве дочернейJob'ы
. -
В корутине сохраняется
ChildHandle
, специальный обработчик который отменит родительскуюJob'у
если будет выброшена ошибка.
Таким образом между корутинами образуется связь «родитель — ребенок», давайте теперь узнаем как дочерние Job'ы
хранятся в родительской и как они извлекаются оттуда:
// дочерние Job'ы берутся из текущего состояния корутины (JobSupport) fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?: when (state) { is Empty -> NodeList() is JobNode -> state.toSingleNodeList() else -> error("State should have list: $state") } // при добавлении дочерней Job'ы у родительской меняется состояние fun attachChild(child: ChildJob): ChildHandle { val node = ChildHandleNode(child) node.parentJob = this val state = _state.value when (state) { is Empty -> { _state.value = node } is Incomplete -> { _state.value = when (val list = state.list) { null -> node.toSingleNodeList() else -> list + node.toSingleNodeList() } } } }
Я упростил JobSupport.attachChild()
для понимания сути, если интересно узнать все подробности работы этого метода качайте исходники).
По итогу все сводится к единому состоянию JobSupport
, где и хранятся дочерние Job'ы
, какое состояние и в каком случае использовать определяется примерно по следующей логике:
-
Если у корутины нет дочерних
Job
то она находится в состоянииEmpty
. -
Если у корутины есть одна дочерняя
Job
то она находится в состоянииJobNode
(конкретный классChildHandleNode
). -
Если у корутины несколько дочерних
Job
то она находится в состоянииNodeList
, который хранит дочерниеJobNode
элементы в связанном списке.
Что ж теперь мы знаем как можно извлечь дочерние Job'ы
и знаем о ссылке на родителя parentHandle
, теперь остается только уведомить их об отмене.
Уведомление родителя и дочерних Job об отмене
Перед тем как зарываться в исходники важно понять что корутина может отмениться по двум разным причинам:
-
Корутина может отменится штатно, через вызов
Job.cancel()
метода, в таком случае создастся / выброситсяCancellationException
, отменится текущая корутина и отменятся дочерние, в родительскую ничего не будет проброшено. -
Корутина может отменится внештатно, например вывалится любое исключение отличное от
CancellationException
, в таком случае отменится текущая корутина, отменятся дочерние и если у вас не указанSupervisorJob
то отмена также будет прокинута родителю.
Посмотрим как это реализовано в коде:
private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean { ... // уведомляем родителя и детей об отмене // list это список Job, взятый из текущего состояния корутины (JobSupport) notifyCancelling(list, rootCause) return true } private fun notifyCancelling(list: NodeList, cause: Throwable) { ... // прокидывание отмены в дочерние Job'ы notifyHandlers(list, cause) { it.onCancelling } // прокидывание отмены в родительскую Job'у cancelParent(cause) } // уведомление дочерних Job об отмене private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) { var exception: Throwable? = null list.forEach { node -> // пробегаемся по дочерним Job'ам и отменяем их // как мы уже знаем конкретным JobNode узлом для // дочерних корутин является ChildHandleNode if (node is JobNode && predicate(node)) { try { node.invoke(cause) } catch (ex: Throwable) { exception?.apply { addSuppressed(ex) } ?: run { exception = CompletionHandlerException("Exception in completion handler $node for $this", ex) } } } } // если при оповещении дочерних JobNode узлов об отмене произошло // какое-то исключение, то оно обработается в CoroutineExceptionHandler'е, // реализация которого по умолчанию завершает программу со stacktrace'ом exception?.let { handleOnCompletionException(it) } } // уведомление родительской Job'ы об отмене private fun cancelParent(cause: Throwable): Boolean { // isScopedCoroutine равен true только для корутин, созданных // через coroutineScope { ... } билдер, такие корутины работают // как suspend блок: пробрасывают исключение через конструкцию // throw exception, поэтому нет необходимости обращаться к // родительской корутине через parentHandle if (isScopedCoroutine) return true // простая проверка: штатная отмена или нет val isCancellation = cause is CancellationException val parent = parentHandle // если родителя нет, ничего с ним делать не надо if (parent === null || parent === NonDisposableHandle) { return isCancellation } // оповещаем родительскую Job'у об отмене return parent.childCancelled(cause) || isCancellation }
В конечном итоге будут вызваны:
-
ChildHandleNode.invoke()
для оповещения дочернихJob
об отмене — дочерниеChildHandleNode
узлы хранятся в текущем состоянииJobSupport
. -
ChildHandleNode.childCancelled()
для оповещения родительскойJob'ы
об отмене — родительскийChildHandleNode
узел хранится в полеparentHandle
, которое принадлежит классуJobSupport
.
Рассмотрим поближе класс ChildHandleNode
:
class ChildHandleNode( @JvmField val childJob: ChildJob ) : JobNode(), ChildHandle { override val parent: Job get() = job override val onCancelling: Boolean get() = true // при вызове invoke() отменяется дочерняя Job'а override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) // при вызове childCancelled() отменяется родительская Job'а override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) } open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob { // вызывается дочерней Job'ой для отмены родительской open fun childCancelled(cause: Throwable): Boolean { // родительская Job'а отменяется только в случае внештатной отмены if (cause is CancellationException) return true return cancelImpl(cause) && handlesException } // вызывается родительской Job'ой для отмены дочерних override fun parentCancelled(parentJob: ParentJob) { // дочерняя Job'а отменяется всегда когда отменяется родительская cancelImpl(parentJob) } } // класс JobImpl наследует JobSupport class SupervisorJobImpl(parent: Job?) : JobImpl(parent) { // в SupervisorJob родительская Job'а никогда не отменяется override fun childCancelled(cause: Throwable): Boolean = false }
Возвращаясь к двум возможным вариантам отмены корутины, приведенных в начале раздела, можем сделать следующие выводы:
-
Штатная отмена: отмена корутины приведет к вызову
JobSupport.cancelImpl()
у дочернихJob
, которые будут прокидывать отмену все ниже и ниже по дереву пока не отменятся все дочерние корутины, по итогу отмененные корутины будут находиться в состоянииFinishing
— то есть в состоянии отмены. -
Внештатная отмена: отмена корутины приведет в вызову
JobSupport.cancelImpl()
у дочернихJob
, которые отменятся аналогично штатной отмене, в добавок если вы не указалиSupervisorJob
отмена будет прокидываться родителям все выше и выше по дереву пока не дойдет до самого главного, где исключение будет прокинуто вCoroutineExceptionHandler
(по умолчанию установленCoroutineExceptionHandler
, который крашит программу со stacktrace’ом).
Вроде бы разобрались с отменой, но как будто чего-то не хватает, да мы сбросили состояние корутины на Finishing
, но не разобрались как происходит фактическая отмена определенной работы, например как отменяется функция delay()
или как отменяются suspend функции, что ж пора и в этом разобраться.
Как отменяется функция delay()
Освежу память простейшим примером:
val job = launch { // должен отменится через 200 миллисекунд delay(1_000) } delay(200) // отменяем корутину вместе с delay() вызовом job.cancel()
В прошлых разделах мы узнали что в самой корутине нет никакой фактической отмены логики, а только управление состоянием и проброс отмены родительской / дочерним корутинам, поэтому лезем в исходники delay()
функции:
suspend fun delay(timeMillis: Long) { if (timeMillis <= 0) return // приостанавливает корутину до тех пор пока // не вызовется Continuation.resumeWith(), это произойдет // спустя некоторое время timeMillis, так в принципе и работает delay() return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> -> // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule. if (timeMillis < Long.MAX_VALUE) { cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) } } } // реализация delay() из Android диспатчера HandlerContext, // который работает на главном потоке приложения override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } // ставим задачу в очередь главного потока, второй параметр // определяет через сколько миллисекунд она будет выполнена if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { // CancellableContinuation позволяет добавить обработчик, // который будет вызван при отмене корутины, // в нашем случае это нужно для удаления задачи из очереди continuation.invokeOnCancellation { handler.removeCallbacks(block) } } }
Осталось только узнать кто вызывает обработчик отмены, добавленный через CancellableContinuation.invokeOnCancellation()
метод. Чтобы разобраться в этом провалимся в функцию suspendCancellableCoroutine
:
suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> // создается отменяемый Continuation и в него оборачивается текущий, // чтобы в дальнейшем вернуть результат обратно val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) // тут происходит добавление CancellableContinuationImpl в качестве // дочернего JobNode узла для текущей корутины cancellable.initCancellability() block(cancellable) // если можем сразу вернуть результат возвращаем его // или приостанавливаем корутину если это невозможно cancellable.getResult() } // метод из CancellableContinuationImpl fun initCancellability() { val handle = installParentHandle() ?: return ... } // метод из CancellableContinuationImpl private fun installParentHandle(): DisposableHandle? { // берем родительскую Job'у, ей является корутина val parent = context[Job] ?: return null // добавляем в состояние родительской Job'ы новый узел ChildContinuation val handle = parent.invokeOnCompletion(handler = ChildContinuation(this)) _parentHandle.compareAndSet(null, handle) return handle } private class ChildContinuation( @JvmField val child: CancellableContinuationImpl<*> ) : JobNode() { override val onCancelling get() = true // этот метод будет вызван когда родительская Job'а // начнет отменяться, вспомните как корутина уведомляет // об отмене родителей и детей override fun invoke(cause: Throwable?) { // отменяется CancellableContinuationImpl child.parentCancelled(child.getContinuationCancellationCause(job)) } }
Как вы уже знаете при отмене корутины возьмутся все дочерние JobNode
узлы из ее текущего состояния и у каждого узла будет вызван метод JobNode.invoke()
, в данном случае ChildContinuation
является абсолютно таким же узлом, как и ChildHandleNode
:
class ChildContinuation(val child: CancellableContinuationImpl<*>) : JobNode() { override fun invoke(cause: Throwable?) { // при вызове JobNode.invoke() у CancellationContinuation будет // вызван метод parentCancelled() child.parentCancelled(child.getContinuationCancellationCause(job)) } } // метод из CancellableContinuationImpl internal fun parentCancelled(cause: Throwable) { if (cancelLater(cause)) return // по итогу произойдет обычная отмена через // CancellableContinuationImpl.cancel() метод cancel(cause) // Even if cancellation has failed, we should detach child to avoid potential leak detachChildIfNonResuable() }
Ну и собственно как происходит отмена обработчиков в CancellableContinuationImpl
:
// метод из CancellableContinuationImpl override fun cancel(cause: Throwable?): Boolean { _state.loop { state -> if (state !is NotCompleted) return false // false if already complete or cancelling // состояние меняется на отмененное val update = CancelledContinuation(this, cause, handled = state is CancelHandler || state is Segment<*>) if (!_state.compareAndSet(state, update)) return@loop // вызывает обработчик отмены, зарегистрированный через // invokeOnCancellation() метод when (state) { is CancelHandler -> callCancelHandler(state, cause) is Segment<*> -> callSegmentOnCancellation(state, cause) } ... return true } } override fun invokeOnCancellation(handler: CompletionHandler) = // добавляет в CancellableContinuationImpl обработчик, // вызываемый при отмене корутины invokeOnCancellation(CancelHandler.UserSupplied(handler))
Суммируем как происходит отмена функции delay()
:
-
Сначала отменяется сама корутина — извлекаются дочерние
JobNode
узлы из текущего состояния и оно меняется наFinishing
. -
Для каждого дочернего
JobNode
узла вызываетсяJobNode.invoke()
метод —JobNode
узлы содержат ссылки не только на корутиновскиеJob'ы
, но и наCancellableContinuation
сущности,CancellableContinuation
используется в функцииdelay()
. -
При вызове
JobNode.invoke()
дляCancellableContinuation
, будут вызваны все обработчики, добавленные черезCancellableContinuation.invokeOnCancellation()
метод, в том числе и обработчик добавленный в функцииdelay()
. -
Обработчик в функции
delay()
отменит задержку — в Android реализации для этого используетсяHandler.removeCallbacks()
метод, он удалит задачу из очереди главного потока.
Как отменяются suspend функции
Важно понимать что не во всех ситуациях suspend функции могут быть отменены, например здесь это невозможно:
suspend fun sleep() { Thread.sleep(500) } suspend fun welcome() { println("Welcome!") } val job = launch(Dispatchers.IO) { sleep() welcome() } delay(100) // ничего не отменится, выполнится sleep() вместе с welcome() job.cancel()
Так происходит потому что ни одна из suspend функций не возвращает состояния приостановки и следовательно код в suspend блоке выполняется синхронно. Если вы уже забыли из прошлой статьи, напоминаю что для каждого suspend блока, запущенного в корутине будет сгенерирован специальный switch case, в нашем случае он будет вот таким:
class GeneratedContinuationImplementation(...) : ... { override fun resumeWith(result: Result<Any?>) { val outcome: Result<Any?> = try { // при хорошо спроектированных suspend функциях, // каждый вызов должен приводить к COROUTINE_SUSPENDED состоянию // после того как функция завершит свое выполнение она вызовет // Continuation.resumeWith() этого Continuation'а и выполнится // следующая suspend функция в switch case блоке val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } // после завершения suspend блока должна завершится сама корутина coroutine.resumeWith(outcome) } fun sleep(cont: Continuation<Any?>): Any { Thread.sleep(500) // нет состояния приостановки COROUTINE_SUSPENDED, // код выполняется синхронно return Unit } fun welcome(cont: Continuation<Any?>): Any { // тут также нет состояния приостановки println("Welcome!") return Unit } // я убрал switch case для простоты, логика абсолютно такая же override fun invokeSuspend(result: Result<Any?>): Any { // так как нет состояния приостановки все методы // выполняются друг за другом sleep(this) welcome(this) // завершение suspend блока return result } }
Обязательно вернитесь к прошлой статье если ничего непонятно по suspend блоку, там подробно это разжевано, ну а мы попробуем починить этот код чтобы он стал отменяемым:
suspend fun sleep() { // я только добавил withContext вызов withContext(Dispatchers.IO) { Thread.sleep(500) } } suspend fun welcome() { println("Welcome!") } val job = launch { sleep() welcome() } delay(100) // выполнится только sleep(), а welcome() будет отменен job.cancel()
Вуаля! Мы добавили withContext()
вызов, который под капотом создает DispatchedCoroutine
, где выполняется suspend блок с Thread.sleep()
вызовом, смотрим как это будет работать:
class GeneratedContinuationImplementation(...) : ... { override fun resumeWith(result: Result<Any?>) { val outcome: Result<Any?> = try { // при хорошо спроектированных suspend функциях, // каждый вызов должен приводить к COROUTINE_SUSPENDED состоянию // после того как функция завершит свое выполнение она вызовет // Continuation.resumeWith() этого Continuation'а и выполнится // следующая suspend функция в switch case блоке val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { // вот тут обрабатывается CancellationException, // поэтому если вы напишите свой try / catch в suspend блоке // и перехватите это исключение, у вас может сломаться // отмена корутины Result.failure(exception) } // после завершения suspend блока должна завершится сама корутина coroutine.resumeWith(outcome) } fun sleep(cont: Continuation<Any?>): Any { // создаем DispatchedCoroutine, под капотом она добавится // как дочерняя Job'а в родительскую корутину val coroutine = DispatchedCoroutine(...) // для простоты не стал расписывать логику диспатчеризации, // она есть в прошлой статье coroutine.runSuspendBlockInIOThread { // выполняем какую-то работу на IO потоке Thread.sleep(500) // также упростил логику возвращения на предыдущий поток coroutine.runResumeInPreviousThread { // выходим из состояния приостановки // result вычисляется на основе текущего состояния // DispatchedCoroutine cont.resumeWith(result) } } // до вызова cont.resumeWith() метод будет находиться // в состоянии приостановки return COROUTINE_SUSPENDED } fun welcome(cont: Continuation<Any?>): Any { // эта функция остается без изменений println("Welcome!") return Unit } var label = 0 override fun invokeSuspend(result: Result<Any?>): Any { when(label) { 0 -> { result.throwOnFailure() sleep(this) return COROUTINE_SUSPENDED } 1 -> { // result может содержать исключение CancellationException, // которое должно будет прокинуться выше и отменить выполнение // suspend блока result.throwOnFailure() welcome() } } return result } }
В чем собственно заключается отмена? Да все просто, любая suspend функция, которая хочет отменить дальнейшее выполнение других функций в suspend блоке должна вернуть результат с исключением:
fun welcome(continuation: Continuation<Any>): Any { continuation.resumeWith(Result.failure(CancellationException("Отмени suspend блок!"))) return COROUTINE_SUSPENDED }
Как мы уже выяснили такое возможно только если такая функция находится в состоянии приостановки, в противном случае suspend блок продолжит выполняться.
После того как Continuation
, сгенерированный для suspend блока, получил результат с исключением, он выбросит это исключение через вызов Result.throwOnFailure()
и завершит выполнение всего switch case блока.
Остается только найти место где возвращается результат с исключением, в нашем случае это находится вот тут:
fun sleep(cont: Continuation<Any?>): Any { val coroutine = DispatchedCoroutine(...) coroutine.runSuspendBlockInIOThread { Thread.sleep(500) coroutine.runResumeInPreviousThread { // result берется из текущего состояния DispatchedCoroutine // и он как раз содержит CancellationException, когда корутина // была отменена cont.resumeWith(result) } } return COROUTINE_SUSPENDED } val job = launch { // в sleep() создается DispatchedCoroutine и добавляется // как дочерний узел ChildHandleNode в текущую корутину sleep() } delay(100) // при отмене корутины как вы уже знаете все дочерние узлы ChildHandleNode // тоже буду отменяться, следовательно DispatchedCoroutine перейдет в состояние // Finishing, содержащее CancellationException, это исключение // и пробросится в сгенерированный Continuation для suspend блока job.cancel()
Вот таким нехитрым способом происходит отмена suspend функций, хотя скорее suspend блока, не забывайте что отмениться может только запланированный код (следующие suspend функции) и только в случае состояния приостановки текущей.
Получается, что никакой магии нет.
Finishing это не последнее состояние корутины?
Из предыдущего раздела мы увидели что после отмены корутины, отменится дальнейшее выполнение suspend блока, а результат вернется обратно в корутину:
class GeneratedContinuationImplementation(...) : ... { override fun resumeWith(result: Result<Any?>) { val outcome: Result<Any?> = ... // возвращаем результат обратно в корутину coroutine.resumeWith(outcome) } ... }
Это сделано для того чтобы корутина выполнила завершающие действия и перешла в конечное состояние:
override fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } fun makeCompletingOnce(proposedUpdate: Any?): Any? { loopOnState { state -> val finalState = tryMakeCompleting(state, proposedUpdate) ... } } // я упростил логику до одного tryMakeCompleting() метода, // если хотите досконально разобраться во всех нюансах // читайте исходники JobSupport.tryMakeCompleting() fun tryMakeCompleting(state: Any?, result: Any?) { if (state !is Incomplete) return COMPLETING_ALREADY // получаем дочерние Job'ы val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY // получаем состояние завершения или создаем новое для отмены дочерних узлов val finishing = state as? Finishing ?: Finishing(list, false, null) // тут извлекается искючение либо из текущего состояния корутины state // либо из пришедшего результата result, его может не быть если // корутина завершается сама без отмен и ошибок val notifyRootCause: Throwable? = ... // если пришло исключение отмены или внештатная ошибка // нужно отменить дочерние Job'ы notifyRootCause?.let { notifyCancelling(list, it) } // текущая корутина будет ждать пока выполняться / отменятся дочерние корутины val child = list.nextChild() if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN // вычисляются финальное исключение и финальное состояние, // обычно если корутина завершается штатно конечным состоянием будет: // Unit для launch корутин, // Any для withContext корутин, // если с ошибкой / отменой то CompletedExceptionally val finalException = ... val finalState = ... if (finalException != null) { // для отмены cancelParent() всегда возращает true, // в случае // внештатного исключения оно будет прокидываться до главного // родителя, где вызовется метод handleJobException() и // исключение обработается через CoroutineExceptionHandler: // по умолчанию установлен CoroutineExceptionHandler, // который крашит программу со stacktrace'ом val handled = cancelParent(finalException) || handleJobException(finalException) if (handled) (finalState as CompletedExceptionally).makeHandled() } // устанавливаем финальное состояние _state.compareAndSet(state, finalState.boxIncomplete()) return finalState }
В итоге после завершения корутины у нее будет одно из следующих состояний:
-
Unit — для штатного завершения корутин, запущенных через launch билдер (
StandaloneCoroutine
). -
CompletedExceptionally
— для отмененной корутины или завершенной внештатно (исключение отличное отCancellationException
). -
Любой другой тип — для корутин, запущенных через withContext билдер (
DispatchedCorotuine
).
С финальным состоянием разобрались, теперь посмотрим как родительская корутина дожидается завершения своих детей, предположим у нас есть следующий код:
// корутина 1 launch { // корутина 2 launch(Dispatchers.IO) { ... } // корутина 3 launch(Dispatchers.IO) { ... } }
Не секрет что запуск корутин 2 и 3 произойдет без приостановки так как тут не надо дожидаться результата, а это значит что suspend блок корутины 1 сразу вернет управление через вызов Continuation.resumeWith()
, где как мы уже выяснили корутина должна будет завершится.
Чтобы корутина 1 дождалась детей, есть специальная логика:
fun tryMakeCompleting(state: Any?, result: Any?) { ... // текущая корутина будет ждать пока выполняться / отменятся дочерние корутины val child = list.nextChild() if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN ... // только после завершения детей корутина поменяет свое состояние // на полностью завершенное _state.compareAndSet(state, finalState.boxIncomplete()) ... } private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { // в дочернюю Job'у будет добавлен специальный CompletionHandler, // который вызовется когда она будет завершаться val handle = child.childJob.invokeOnCompletion( invokeImmediately = false, handler = ChildCompletion(this, state, child, proposedUpdate) ) // если дочерняя Job'а еще не завершена ждем if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it // иначе смотрим следующую дочернюю Job'у val nextChild = child.nextChild() ?: return false return tryWaitForChild(state, nextChild, proposedUpdate) } private class ChildCompletion( private val parent: JobSupport, private val state: Finishing, private val child: ChildHandleNode, private val proposedUpdate: Any? ) : JobNode() { override val onCancelling get() = false // когда дочерняя Job'а будет завершаться она вызовет этот обработчик override fun invoke(cause: Throwable?) { // родительская корутина продолжит завершаться parent.continueCompleting(state, child, proposedUpdate) } }
Родительская корутина будет ждать завершения детей и пока последняя дочерняя корутина не вызовет ChildCompletion.invoke()
обработчик, она не перейдет в финальное состояние.
P.S. Спасибо чуваку @manwtein что упомянул в комментах про ожидание дочерних корутин в родителе!
Итоги
Ух, думал не выдержу до конца этого длинного текста, но чем заниматься в Новогодние праздники если не писать суперсложную техническую статью?).
Давайте подведем итоги:
-
Job
— элементCoroutineContext'а
, отвечающий за хранение одного из состояний корутины: выполнение, отмена, завершение, завершение с ошибкой, для lazy корутин есть еще состояние «не начала выполняться». -
В
Job
есть специальные Boolean поля, которые могут дать информацию о текущем состоянии корутины: isActive — выполняется корутина или нет, isCompleted — завершена корутина полностью или нет, isCancelled — отменена корутина или нет. -
JobSupport
— это одна из базовых реализацийJob'ы
, от которой наследуются другие реализации такие какSupervisorJob
например. -
JobSupport
может содержать ссылку на родительскуюJob'у
, если у корутины есть родитель, это нужно для пробрасывания отмены родителю в случае внештатной отмены. -
JobSupport
состояние хранит в себе дочерниеJob’ы
по следующей логике: если нет дочернихJob
, хранитсяEmpty
состояние; если есть одна дочерняяJob'а
хранитсяJobNode
узел, конкретный класс —ChildHandleNode
; если несколько дочернихJob
хранится состояниеNodeList
, где дочерниеJobNode
элементы лежат в связанном списке. -
Класс корутины
StandaloneCoroutine
является наследникомJobSupport
, поэтому вся логика изJobSupport
доступна корутине. -
Отмена корутины может быть штатной, например при вызове
Job.cancel()
, в таком случае отменятся только дочерние корутины, отмена не будет пробрасываться родителю. -
Отмена корутины может быть внештатной, например при выбросе любого исключения отличного от
CancellationException
, в таком случае отменятся дочерние корутины и отмена будет проброшена родителю, который может либо не пробрасывать дальше отмену (если этоSupervisorJob
), либо пробросить ее до самого главного родителя, где исключение будет обработаноCoroutineExceptionHandler'ом
. -
По умолчанию установлен
CoroutineExceptionHandler
, который крашит программу со stacktrace’ом. -
Фактическая отмена логики осуществляется не в самой корутине, а в правильно организованных suspend функциях таких как
delay()
или в suspend блоках, содержащих такие функции. -
Функция
delay()
под капотом используетCancellableContinuation
, который добавляется в родительскую корутину как дочерняяJob'а
и при отмене корутины аналогично дочерним корутинам отменяется. Самая главная фишкаCancellableContinuation
в том что при отмене он вызывает специальные обработчики, их можно добавить через методCancellableContinuation.invokeOnCancellation()
, в функцииdelay()
такой обработчик отменяет задержку. -
Отменить можно только тот suspend блок где suspend функции могут быть в приостановленном состоянии, в противном случае код выполнится как один большой метод.
-
После отмены или завершения корутины она переходит в одно из финальных состояний: Unit — для штатного завершения корутин, запущенных через launch билдер (
StandaloneCoroutine
),CompletedExceptionally
— для отмененной корутины или завершенной внештатно, любой другой тип — для корутин, запущенных через withContext билдер (DispatchedCorotuine
).
Полезные ссылки:
Пишите в комментах ваше мнение и всем хорошего кода!
ссылка на оригинал статьи https://habr.com/ru/articles/873076/
Добавить комментарий