Асинхронный Rust в трех частях
Во введении мы сказали, что async/await
это про futures и задачи. В первой части мы рассмотрели futures и теперь пришло время задач. Благо, мы с ними уже встречались, хоть мы их так и не называли. Последняя версия нашего основного цикла в первой части выглядела вот так:
let mut joined_future = Box::pin(future::join_all(futures)); let waker = futures::task::noop_waker(); let mut context = Context::from_waker(&waker); while joined_future.as_mut().poll(&mut context).is_pending() { … }
Показанная выше joined_future
это самый простой пример задачи. Это верхнеуровневый future, опрашиваемый основным циклом. Здесь у нас всего одна задача, но ничего не мешает нам добавить больше. Если бы у нас была коллекция задач, мы могли бы добавить ее в рантайме.
Это и делает tokio::task::spawn. Мы можем переписать наш первоначальный пример Tokio с использованием spawn
вместо join_all
:
#[tokio::main] async fn main() { let mut task_handles = Vec::new(); for n in 1..=10 { task_handles.push(tokio::task::spawn(foo(n))); } for handle in task_handles { handle.await.unwrap(); } }
foo
это все еще async fn
, но в остальном это очень похоже на наш изначальный пример thread::spawn. Как и потоки, но не как обычные future, задачи начинают выполняться в фоновом режиме сразу после вызова spawn
, так что применение .await
к handle задачи работает как join
к handle потока. В сетевых сервисах часто применяется подход с основным циклом, который слушает в ожидании новых подключений и создает новые потоки для обработки каждого из них. Асинхронные задачи позволяют нам использовать такой же подход без оверхеда потоков. Этим мы и займемся в третьей части.
Используя основной цикл из первой части мы напишем собственный spawn
. Это будет происходить в три этапа: сначала мы выделим место под несколько задач в основном цикле, затем напишем функцию spawn
для добавления новых задач и, наконец, реализуем JoinHandle.
Dyn
Мы уже знаем, как опрашивать несколько future за один раз, поскольку именно это мы делали при реализации JoinAll. Что же мы можем скопипастить?
Одна из вещей, которые необходимо изменить — тип Vec
для futures. Наш JoinAll
использовал Vec<Pin<Box<F>>>
, где F
— дженерик‑параметр типа, но в нашей основной функции нет каких‑либо параметров типа. Мы также хотим, чтобы новый вектор мог содержать futures разных типов одновременно. Нужная нам в данном случае фича Rust это динамические трейт‑объекты — dyn Trait
. Начнем с алиаса типа, чтобы не писать его по несколько раз:
type DynFuture = Pin<Box<dyn Future<Output = ()>>>;
Обратите внимание, что у DynFuture
нет параметров типа. Мы можем поместить любой future, упакованный в Box
, до тех пор пока Output
— ()
. Теперь, вместо создания join_future
в функции main
мы создадим Vec<DynFuture>
и начнем называть эти futures задачами:
fn main() { let mut tasks: Vec<DynFuture> = Vec::new(); for n in 1..=10 { tasks.push(Box::pin(foo(n))); } let waker = futures::task::noop_waker(); let mut context = Context::from_waker(&waker); …
Мы управляем Vec<DynFuture>
с помощью retain_mut
, как это делал JoinAll
, убирая futures из Vec
, как только они возвращают Ready
. Нам необходимо изменить цикл while
на loop/break
, чтобы выполнять опрос, проверять готовы ли мы, и затем обрабатывать Waker
’ов. Теперь это выглядит так:
let waker = futures::task::noop_waker(); let mut context = Context::from_waker(&waker); loop { // Poll each task, removing any that are Ready. let is_pending = |task: &mut DynFuture| { task.as_mut().poll(&mut context).is_pending() }; tasks.retain_mut(is_pending); // If there are no tasks left, we're done. if tasks.is_empty() { break; } // Otherwise handle WAKE_TIMES and sleep as in Part One... …
Это нормально работает, хоть и не ощущается как особое достижение. По большей части мы просто скопипастили код из JoinAll
и поправили типы. Но этим мы заложили важную основу.
Обратите внимание, что поведение данного цикла несколько отличается от того, как работают задачи в Tokio. Обычно Rust завершает работу когда основной поток закончил работу, не дожидаясь завершения фоновых процессов, точно также Tokio завершает работу, когда основная задача завершилась, не дожидаясь фоновых задач. Однако, наша версия основного цикла продолжает работу пока все задачи не завершатся. Также она предполагает, что у задач нет возвращаемого значения. Мы исправим эти два момента, когда доберемся до JoinHandle
, но сначала займемся spawn
.
Spawn
Функция spawn
должна добавлять новые future в Vec
задач. Как стоит реализовать доступ к Vec
? Было бы удобно, если бы мы могли делать то же, что делали с WAKE_TIMES
и сделать TASKS
глобальной переменной, защищенной Mutex
, но в этот раз это не сработает. Наш основной цикл вешает лок на WAKE_TIMES
по завершении опроса, но если мы сделаем TASKS
глобальной, то основной цикл будет вешать лок в процессе опроса и любая задача, вызывающая spawn
попадет в дедлок.
Мы обойдем это, создав два разных списка. Мы оставим tasks
на своем месте — в качестве локальной переменной основного цикла, а также добавим глобальный список NEW_TASKS
. Функция spawn
будет добавлять задачи в NEW_TASKS
:
static NEW_TASKS: Mutex<Vec<DynFuture>> = Mutex::new(Vec::new()); fn spawn<F: Future<Output = ()>>(future: F) { NEW_TASKS.lock().unwrap().push(Box::pin(future)); }
Теперь основной цикл может… погодите‑ка, оно не компилируется:
error[E0277]: `(dyn Future<Output = ()> + 'static)` cannot be sent between threads safely --> tasks_no_send_no_static.rs:43:19 | 43 | static NEW_TASKS: Mutex<Vec<DynFuture>> = Mutex::new(Vec::new()); | ^^^^^^^^^^^^^^^^^^^^^ `(dyn Future<Output = ()> + 'static)` cannot be sent between threads | = help: the trait `Send` is not implemented for `(dyn Future<Output = ()> + 'static)`, which is required by `Mutex<Vec<Pin<Box<(dyn Future<Output = ()> + 'static)>>>>: Sync`
Глобальные переменные в Rust должны быть Sync
, а Mutex<T> является Sync только когда T является Send. DynFuture
должен обещать, что он реализует Send
:
type DynFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
Итак, теперь… не‑а, все еще не собирается:
error[E0277]: `F` cannot be sent between threads safely --> src/main.rs:46:36 | 46 | NEW_TASKS.lock().unwrap().push(Box::pin(future)); | ^^^^^^^^^^^^^^^^ `F` cannot be sent between threads safely | = note: required for the cast from `Pin<Box<F>>` to `Pin<Box<(dyn futures::Future<Output = ()> + std::marker::Send + 'static)>>`
Справедливо, spawn
должна делать то же самое:
fn spawn<F: Future<Output = ()> + Send>(future: F) { … }
Ну что, доволен? Не‑а:
error[E0310]: the parameter type `F` may not live long enough --> src/main.rs:46:36 | 46 | NEW_TASKS.lock().unwrap().push(Box::pin(future)); | ^^^^^^^^^^^^^^^^ | | | the parameter type `F` must be valid for the static lifetime... | ...so that the type `F` will meet its required lifetime bounds
Глобальные переменные имеют лайфтайм 'static
, а значит они не могут держать указатели на что‑либо, что может быть освобождено. Трейт‑объекты, такие как DynFuture
по умолчанию 'static
, а вот типы параметров — такие как F
— нет. Если spawn
хочет поместить F
в глобальную переменную, она должна обещать, что F
— 'static
:
fn spawn<F: Future<Output = ()> + Send + 'static>(future: F) { … }
Наконец‑то оно компилируется. Многовато телодвижений просто чтобы создать глобальный Vec
, давайте подумаем, что именно мы создали: вместо «Vec
с futures», NEW_TASKS
это «Vec
потоко‑безопасных futures, не содержащих потенциально подвешенных указателей». У Rust нет сборщика мусора, так что подвешенные указатели привели бы к багам повреждения памяти и возможность сказать, что мы не хотим этого — неплохая вещь.
Итак…теперь основной цикл может перемещать задачи из NEW_TASKS
в tasks
. Это не требует много кода, но есть пара моментов, которые стоит учитывать, и в этот раз это баги рантайма вместо ошибок компиляции. В первую очередь, нам нужно опрашивать новые задачи до следующей итерации основного цикла, чтобы у них был шанс запросить пробуждение перед сном. Также нам надо убедиться, что NEW_TASKS
разблокирована перед опросом, иначе у нас снова возникнет дедлок, которого мы старались избежать. Вот расширенный основной цикл:
loop { // Poll each task, removing any that are Ready. let is_pending = |task: &mut DynFuture| { task.as_mut().poll(&mut context).is_pending() }; tasks.retain_mut(is_pending); // Collect new tasks, poll them, and keep the ones that are Pending. loop { let Some(mut task) = NEW_TASKS.lock().unwrap().pop() else { break; }; // Polling this task could spawn more tasks, so it's important that // NEW_TASKS isn't locked here. if task.as_mut().poll(&mut context).is_pending() { tasks.push(task); } } // If there are no tasks left, we're done. if tasks.is_empty() { break; } // Otherwise handle WAKE_TIMES and sleep as in Part One... …
Подготовив почву, мы можем задать функцию async_main
и передать ей бразды запуска задач вместо хардкодинга списка задач в main
:
async fn async_main() { // The main loop currently waits for all tasks to finish. for n in 1..=10 { spawn(foo(n)); } } fn main() { let waker = futures::task::noop_waker(); let mut context = Context::from_waker(&waker); let mut tasks: Vec<DynFuture> = vec![Box::pin(async_main())]; …
Оно работает! Из‑за того, как мы добавляем и убираем задачи в и из NEW_TASKS
, порядок вывода отличается. Мы могли бы исправить это, но давайте оставим так. Это хорошее напоминание, что подобно потокам, одновременно выполняемые задачи могут выполняться в любом порядке.
JoinHandle
Как мы уже упоминали ранее, Tokio поддерживает фоновые задачи, которые не блокируют завершение программы, а также есть поддержка возврата значений у задач. Обе фичи требуют от tokio::task::spawn возвращать tokio::task::JoinHandle подобно тому, как thread::spawn возвращает thread::JoinHandle. Для того, чтобы тоже иметь данный функционал мы реализуем наш собственный JoinHandle
. Также, поскольку до этого мы сталкивались с блокированием только в контексте sleep
, мы познакомимся с новой формой блокирования и необычным багом, проистекающим из нее.
JoinHandle
должен общаться между двумя задачами: одна в процессе завершения, другая — ждет завершения первой. Ожидающей задаче нужно куда‑то поместить свой Waker
, чтобы завершающая могла его выполнить, а завершающей задаче в свою очередь надо куда‑то передать возвращаемое значение T
, чтобы ожидающая задача могла его получить. Они не нужны нам одновременно, так что мы можем использовать enum
. Он должен давать общий доступ и быть изменяем, так что обернем его в Arc
и Mutex
:
enum JoinState<T> { Unawaited, Awaited(Waker), Ready(T), Done, } struct JoinHandle<T> { state: Arc<Mutex<JoinState<T>>>, }
Ожидание завершения задачи будет происходить за счет ожидания JoinHandle
, так что последнему в свою очередь необходимо реализовать Future
. Загвоздка в том, что ожидающий поток хочет владеть переданным в JoinState::Ready(T)
значением T
, но Arc<Mutex<JoinState>>
позволяет обращаться к JoinState
только по ссылке, так что мы не можем переместить T
и «оставить дыру» там, куда ведет ссылка. Вместо этого мы заменим весь JoinState
, используя mem::replace:
impl<T> Future for JoinHandle<T> { type Output = T; fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<T> { let mut guard = self.state.lock().unwrap(); // Use JoinState::Done as a placeholder, to take ownership of T. match mem::replace(&mut *guard, JoinState::Done) { JoinState::Ready(value) => Poll::Ready(value), JoinState::Unawaited | JoinState::Awaited(_) => { // Replace the previous Waker, if any. *guard = JoinState::Awaited(context.waker().clone()); Poll::Pending } JoinState::Done => unreachable!("polled again after Ready"), } } }
Передаваемые spawn
future ничего не знают о JoinState
, так что нам будет нужна обертка для обработки возвращаемых значений и запуска Waker
при его наличии:
async fn wrap_with_join_state<F: Future>( future: F, join_state: Arc<Mutex<JoinState<F::Output>>>, ) { let value = future.await; let mut guard = join_state.lock().unwrap(); if let JoinState::Awaited(waker) = &*guard { waker.wake_by_ref(); } *guard = JoinState::Ready(value) }
Теперь мы можем создать JoinState
и использовать нашу обертку в spawn
, чтобы принимался любой тип вывода и возвращался JoinHandle
:
fn spawn<F, T>(future: F) -> JoinHandle<T> where F: Future<Output = T> + Send + 'static, T: Send + 'static, { let join_state = Arc::new(Mutex::new(JoinState::Unawaited)); let join_handle = JoinHandle { state: Arc::clone(&join_state), }; let task = Box::pin(wrap_with_join_state(future, join_state)); NEW_TASKS.lock().unwrap().push(task); join_handle }
Мы будем собирать и применять .await
к JoinHandle
’ам в async_main
, подобно тому как мы обрабатывали задачи Tokio ранее:
async fn async_main() { let mut task_handles = Vec::new(); for n in 1..=10 { task_handles.push(spawn(foo(n))); } for handle in task_handles { handle.await; } }
Теперь, когда мы можем явно ожидать задачи, мы бы хотели, чтобы основной цикл завершался после завершения основной задачи. Давайте отделим основную задачу от tasks
и переименуем список задач в other_tasks
:
fn main() { let waker = futures::task::noop_waker(); let mut context = Context::from_waker(&waker); let mut main_task = Box::pin(async_main()); let mut other_tasks: Vec<DynFuture> = Vec::new(); loop { // Poll the main task and exit immediately if it's done. if main_task.as_mut().poll(&mut context).is_ready() { return; } // Poll other tasks and remove any that are Ready. let is_pending = |task: &mut DynFuture| { task.as_mut().poll(&mut context).is_pending() }; other_tasks.retain_mut(is_pending); // Handle NEW_TASKS and WAKE_TIMES...
Готово! Мы сделали много изменений за раз и, к счастью, оно собирается. Даже почти работает. Программа выводит корректный текст, но после этого ловит панику:
… end 3 end 2 end 1 thread 'main' panicked at src/main.rs:143:50: sleep forever?
Это и есть тот самый интересный баг, который мы ждали.
Waker
Паника возникает на этой строке, которая была в основном цикле с первой части:
let next_wake = wake_times.keys().next().expect("sleep forever?");
Цикл собирается уйти в sleep
, так что он запрашивает следующее время пробуждения, но древо WAKE_TIMES
пустое. Раньше мы могли предполагать, что если какая‑то из задач возвращала Pending
, то должно существовать хотя бы одно время пробуждения, поскольку единственным источником блокировки был Sleep
. Но теперь у нас есть второй источник: JoinHandle
. Если JoinHandle
в Pending
, это может быть вызвано тем, что другая задача спит и задала время пробуждения. Но также это может произойти когда другая задача готова вернуть Ready
, но мы еще не опросили ее. Это сильно зависит от порядка задач в списке. Если задача в начале списка ждет задачу из конца, мы можем оказаться в ситуации с задачами в Pending
и без запланированных пробуждений.
Именно это и произошло. Основная задача скорее всего блокируется на первом JoinHandle
. Основной цикл пробуждается, опрашивает основную задачу и этот JoinHandle
все еще в Pending
. После чего он опрашивает все задачи из other_tasks
. Каждая из них выводит сообщение о завершении, отправляет сигнал своему JoinHandle
и возвращает Ready
. К этому моменту нам нужно опросить основную задачу вместо попытки сна. Как же нам передать это основному циклу? Мы могли бы создать еще один static
‑флаг, но есть вариант получше. Мы используем наш Waker
.
Мы еще с первой части использовали futures::task::noop_waker для передачи пустого Waker
. Когда Sleep
был единственным источником блокировки, у наших задач не было способа разблокировать другие задачи и все, что нам нужно было от Waker
— заглушка, чтобы код компилировался. Но ситуация поменялась. Наша функция wrap_with_join_state
уже вызывает Waker
‘ов корректно по завершении выполнения задач и было бы неплохо знать, когда это происходит. Почему бы не написать собственный Waker
?
Waker
реализует трейт From<Arc<W>>
, где W
— любой тип с трейтом Wake, который в свою очередь требует наличия метода wake
. Этот метод принимает Arc<Self>
, что немного забавно, но в целом позволяет нам делать что угодно. Самый простой вариант — сделать что‑то вроде Arc<Mutex<bool>>
и делать его true
когда какая‑либо задача получила запрос на пробуждение. Это не сильно отличается от static
‑флага, но дает чужим future’ам вызывать наш Waker
без необходимости знать реализацию основного цикла. Вот наш «bool
»:
struct AwakeFlag(Mutex<bool>); impl AwakeFlag { fn check_and_clear(&self) -> bool { let mut guard = self.0.lock().unwrap(); let check = *guard; *guard = false; check } } impl Wake for AwakeFlag { fn wake(self: Arc<Self>) { *self.0.lock().unwrap() = true; } }
Мы можем создать AwakeFlag
и затем из него Waker
в начале main
:
fn main() { let awake_flag = Arc::new(AwakeFlag(Mutex::new(false))); let waker = Waker::from(Arc::clone(&awake_flag)); let mut context = Context::from_waker(&waker); …
И, наконец, мы можем добавить ту самую проверку в основной цикл:
// Collect new tasks, poll them, and keep the ones that are Pending. loop { let Some(mut task) = NEW_TASKS.lock().unwrap().pop() else { break; }; if task.as_mut().poll(&mut context).is_pending() { other_tasks.push(task); } } // Some tasks might wake other tasks. Re-poll if the AwakeFlag has been // set. Polling futures that aren't ready yet is inefficient but allowed. if awake_flag.check_and_clear() { continue; } // Otherwise handle WAKERS and sleep as in Part One...
Работает! Мы успешно реализовали задачи.
Пришло время пойти дальше и вместо сна и вывода текста на экран взглянуть на реальные ввод‑вывод и использовать spawn
для обработки сетевых соединений.
ссылка на оригинал статьи https://habr.com/ru/articles/854104/
Добавить комментарий