Асинхронщина в Rust: Стандартная библиотека и async/.await

от автора

Введение

Перед вами руководство по специфике асинхронного программирования на языке Rust — точка входа в экосистему библиотек, справочник, на который можно опираться при проектировании системы и решении нетривиальных задач. К прочтению рекомендую и опытным разработчикам, и новичкам в Rust, только решивших окунуться в эту кроличью нору.

Вас ждёт целый цикл статей разного уровня сложности и погружения, затрагивающий не только асинхронное программирование, но и полезные шаблоны проектирования в Rust, такие как перенос инвариантов бизнес-логики на уровень системы типов, индуктивные вычисления на типах и декларативное программирование посредством комбинаторов.

Сегодня мы рассмотрим API стандартной библиотеки для асинхронного программирования и сам долгожданный синтаксис async/.await.


Теория

Данное пособие несёт исключительно практический характер. За теорией обращайтесь к следующим источникам (по порядку):

  1. "Programming Paradigms for Dummies: What Every Programmer Should Know"
  2. Хабр
    1. "Асинхронность: назад в будущее"
  3. Wikipedia
    1. "Asynchronous I/O"
    2. "Event Loop"
    3. "Reactor pattern" & "Proactor pattern"
  4. "The C10K problem"


std

Стандартная библиотека недавно обзавелась обобщённым интерфейсом для асинхронного программирования, а именно — трейт Future и модуль std::task. Эти сущности связывают программы и библиотеки с разными средами асинхронного исполнения (пример такой среды — Tokio, речь о которой пойдёт в следующих статьях), тем самым достигается частичная независимость пользовательского кода от конкретных асинхронных сред.

Точно также, как замыкание представляет собой комбинируемую порцию работы синхронного, последовательного кода, трейт Future представляет собой комбинируемую порцию работы кода асинхронного (одну асинхронную операцию, футуру, фьючерс). Комбинируемость означает, что точно также, как и замыкание способно вызывать другие замыкания (состоять из них, комбинироваться), асинхронная операция способна вызывать другие асинхронные операции (помимо обычных замыканий/функций).

Рассмотрим в общих чертах принцип работы футуры:

В методе poll несложно разглядеть модель кооперативной многозадачности, при которой операционная система не осуществляет переключение контекста; вместо этого задачи сами передают контроль экзекьютору (вызывающему коду, планировщику), чтобы тот, в свою очередь, смог эффективно распределять работу задач на доступные физические исполнители (например, ядра процессора). Чаще всего, футуры исполняются на пуле потоков, но возможны и другие сценарии, например, однопоточное исполнение, а в общем случае и более экзотические варианты. Подробнее об этом в следующей статье, посвящённой Tokio.

Примечания

  • Принадлежность асинхронных операций потокам ОС и потоков ОС процессорным ядрам неустойчивая, т.е. одна футура может спокойно путешествовать по потокам ОС, а потоки ОС мигрировать по ядрам.

  • Правильнее будет назвать не "футуры", а зелёные потоки, или асинхронные операции верхнего уровня, таски, задачи.

Как было сказано выше, трейт не накладывает никаких требований на вызов метода poll, следующего после вызова, вернувшего Poll::Ready(Output): он может запаниковать, войти в бесконечный цикл и создавать множество других проблем. Неопределённое поведение, тем не менее, запрещено (нарушение целостности данных, неправильное использование небезопасных функций), вне зависимости от состояния футуры, потому что сама сигнатура poll ключевым словом unsafe не помечена. Не следует полагаться на конкретные реализации, лишённые непредвиденного поведения.

Объект типа, реализующего Future, — это, прежде всего, обычный объект (значение, переменная). Его можно хранить в динамическом массиве, передавать в функции, возвращать из функций, другими словами, делать с ним всё то, что позволено делать с другими объектами. Название такому явлению — сущность (программный компонент) первого класса, а программирование с использованием асинхронных операций в Rust — программирование высшего порядка.

Иллюстрация выше также содержит Waker. Подробно об этом в следующей секции.


WriteFuture

Для лучшего понимания реализуем свою асинхронною операцию WriteFuture, которая завершится после того, как данные будут отправлены по неблокирующему TCP сокету.

[https://gist.github.com/2f2040d1639bebf723924a73aaa262e7]

use std::{     future::Future,     io::{self, Read, Write},     net::{TcpListener, TcpStream},     pin::Pin,     task::{Context, Poll},     thread::{self, JoinHandle}, };  use tokio::runtime::Runtime;  struct WriteFuture<'a> {     socket: TcpStream,     data: &'a [u8], }  impl<'a> WriteFuture<'a> {     #[allow(dead_code)]     fn new(socket: TcpStream, data: &'a [u8]) -> Self {         socket.set_nonblocking(true).unwrap();         Self { socket, data }     } }  impl Future for WriteFuture<'_> {     type Output = io::Result<usize>;      fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {         let data = self.data;          match self.socket.write(data) {             Ok(length) => Poll::Ready(Ok(length)),             Err(err) if err.kind() == io::ErrorKind::WouldBlock => {                 cx.waker().wake_by_ref();                 Poll::Pending             }             Err(err) => Poll::Ready(Err(err)),         }     } }  fn main() {     let server = run_server();     let client = run_client();      server.join().unwrap();     client.join().unwrap(); }  const ADDR: &'static str = "127.0.0.1:18373";  fn run_server() -> JoinHandle<()> {     thread::spawn(|| {         let listener = TcpListener::bind(ADDR).unwrap();          let (mut client_accepted, _addr) = listener.accept().unwrap();          let mut message = String::new();         client_accepted.read_to_string(&mut message).unwrap();         dbg!(message);     }) }  fn run_client() -> JoinHandle<()> {     thread::spawn(|| {         let client = TcpStream::connect(ADDR).unwrap();          let mut rt = Runtime::new().unwrap();         rt.block_on(WriteFuture::new(client, b"Hello, world!")).unwrap();     }) }

Примечание

Наша WriteFuture имеет один фатальный недостаток: системный вызов отправки внутри self.socket.write(data) совершается каждый раз в при входе в WriteFuture::poll. Как это исправить — в следующей статье.

Вывод:

[src/main.rs:60] message = "Hello, world!"

Внимание на метод WriteFuture::poll. Рассмотрим подробно сопоставление с образом self.socket.write(data):

  • Ветка первая. Данные успешно записаны, возвращаем Poll::Ready(Ok(length)). Асинхронная операция теперь считается завершённой.

  • Ветка вторая. Попытка записи данных вернула ошибку io::ErrorKind::WouldBlock. Это значит, что данные не могут быть записаны мгновенно, без прерывания нашего приложения на продолжительное время. В этом случае мы даём понять экзекьютору, что наша футура требует ещё хотя бы одного вызова, чтобы стать завершённой.

  • Ветка третья. Попытка записи данных вернула ошибку, отличную от io::ErrorKind::WouldBlock. Это может быть потеря соединения, недостаточные привилегии. В этом случае возвращаем эту ошибку, после чего асинхронная операция считается завершённой.

Упрощённо у планировщика задач есть множество футур, каждая из которых в данный момент либо готова сделать прогресс, либо ещё не готова. Работа планировщика заключается в том, чтобы опросить как можно большее количество асинхронных операций, готовых сделать прогресс, за единицу времени.

По-умолчанию новая футура считается готовой совершить прогресс (разморожена), но после первого вызова poll, если, конечно, асинхронная операция уже не завершилась, она "замораживается". Waker — это ручка от экзекьютора с одной обязанностью: вовремя размораживать футуру. Для этого достаточно вызвать .wake_by_ref()/.wake() на ассоциированном с футурой Waker, который можно получить, вызвав метод cx.waker().

Другими словами, экзекьютор как минимум один раз вызывает poll на каждой асинхронной операции, а дальше уже всё зависит от возвращаемого значения poll и контекста. Такой дизайн позволяет не тратить время ЦПУ на бесполезный опрос асинхронных операций, ведь, например, настоящий асинхронный сетевой сокет (а не наша пародия) будет готов принять/отправить данные лишь после того, как очередь событий операционной системы (epoll, kqueue, …) возвратит соответствующее событие.

Без разницы кто и откуда уведомляет экзекьютора о готовности футуры совершить прогресс; достаточным условием является обладать её контекстом. Например, разумно будет в самом цикле событий (или где-то рядом с ним) контроллировать контексты асинхронных сетевых сокетов, потому что их готовность напрямую зависит от поступления события от ОС.

Внутри fn run_client мы запускаем нашу футуру WriteFuture:

let mut rt = Runtime::new().unwrap(); rt.block_on(WriteFuture::new(client, b"Hello, world!"));

В первой строчке создаётся сам рантайм Tokio, а после этого выполнение потока ОС (напомню, что тело fn run_client находится в thread::spawn) блокируется до того момента, когда завершится WriteFuture. Метод block_on — это точка входа в рантайм, главная асинхронная операция, которая запускает все другие (об этом в следующей статье).


async/.await

Как написать асинхронную операцию, которая принимает на вход три другие асинхронные операции, отображает их результаты в объекты иного типа и вычисляет их сумму? Три асинхронные операции могут быть, например, отправкой/считыванием данных на/из сервера, функция отображения переводит отправленные/считанные байты в статистику, затем три статистики складываются в одну.

Существует три способа реализовать это:

  1. Инкапсуляция футур и ручное управление ими в [Future::poll];
  2. Адапторы;
  3. Синтаксис async/.await.

Вариант №1 (ручное управление)

Показать код

use std::{     future::Future,     ops::AddAssign,     pin::Pin,     task::{Context, Poll}, };  use pin_project::pin_project;  #[pin_project] struct CompoundFuture<Fut1, Fut2, Fut3, F, U> {     fut1: Option<Fut1>,     fut2: Option<Fut2>,     fut3: Fut3,     f: F,     result: Option<U>, }  impl<Fut1, Fut2, Fut3, F, U, T> CompoundFuture<Fut1, Fut2, Fut3, F, U> where     Fut1: Future<Output = T>,     Fut2: Future<Output = T>,     Fut3: Future<Output = T>,     F: FnMut(T) -> U, {     #[allow(dead_code)]     fn new(fut1: Fut1, fut2: Fut2, fut3: Fut3, f: F) -> Self {         Self {             fut1: Some(fut1),             fut2: Some(fut2),             fut3,             f,             result: None,         }     } }  impl<Fut1, Fut2, Fut3, F, T, U> Future for CompoundFuture<Fut1, Fut2, Fut3, F, U> where     Fut1: Future<Output = T>,     Fut2: Future<Output = T>,     Fut3: Future<Output = T>,     F: FnMut(T) -> U,     U: AddAssign, {     type Output = U;      fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {         let this = self.project();          if let Some(fut1) = this.fut1 {             // SAFETY: Pin::new_unchecked is safe because             // we won't move fut1 (fut2, fut3)             match unsafe { Pin::new_unchecked(fut1) }.poll(cx) {                 Poll::Pending => return Poll::Pending,                 Poll::Ready(x) => {                     *this.result = Some((this.f)(x));                     *this.fut1 = None;                 }             }         }          if let Some(fut2) = this.fut1 {             match unsafe { Pin::new_unchecked(fut2) }.poll(cx) {                 Poll::Pending => return Poll::Pending,                 Poll::Ready(x) => {                     let result = this.result.as_mut().unwrap();                     *result += (this.f)(x);                     *this.fut2 = None;                 }             }         }          match unsafe { Pin::new_unchecked(this.fut3) }.poll(cx) {             Poll::Pending => Poll::Pending,             Poll::Ready(x) => {                 let result = this.result.as_mut().unwrap();                 *result += (this.f)(x);                 Poll::Ready(this.result.take().unwrap())             }         }     } }

Вариант №2 (адапторы)

use std::ops::AddAssign;  use futures::Future;  fn compound_future<'a, Fut1, Fut2, Fut3, T, U, F>(     fut1: Fut1,     fut2: Fut2,     fut3: Fut3,     mut f: F, ) -> impl Future<Item = U, Error = ()> + 'a where     Fut1: Future<Item = T, Error = ()> + 'a,     Fut2: Future<Item = T, Error = ()> + 'a,     Fut3: Future<Item = T, Error = ()> + 'a,     F: FnMut(T) -> U + 'a,     U: AddAssign, {         fut1.join3(fut2, fut3).map(move |(a, b, c)| {                 let mut result = f(a);                 result += f(b);                 result += f(c);                 result         }) }

Примечание

В коде выше используется версия futures = "0.1", поэтому сигнатура трейта Future немного другая, но пониманию ситуации это никак не мешает.

Вариант №3 (async/.await)

use std::{future::Future, ops::AddAssign};  async fn compound_future<Fut1, Fut2, Fut3, T, U, F>(     fut1: Fut1,     fut2: Fut2,     fut3: Fut3,     mut f: F, ) -> U where     Fut1: Future<Output = T>,     Fut2: Future<Output = T>,     Fut3: Future<Output = T>,     F: FnMut(T) -> U,     U: AddAssign, {     let mut result = f(fut1.await);     result += f(fut2.await);     result += f(fut3.await);     result }

Анализ

В первом варианте мы просто инкапсулировали асинхронные операции в CompoundFuture и в каждом вызове poll делаем прогресс текущей операции вручную. Не сложно догадаться, что это чревато ошибками, ведь в конечном итоге придётся писать своё подобие конечного автомата из Future (что мы и сделали) и много-много повторяющегося кода. Данный подход почти невозможно встретить в пользовательском коде, но полностью исключать его не следует, т.к. он всё ещё используется во внутренностях основополагающих библиотек (futures, Tokio и других).

Второй подход уместился всего в 24 строки, что в 3.5 раза меньше решения с ручным управлением, и, как следствие, понижается риск ошибки, повышается читабельность и поддерживаемость. Он использует понятие адапторов (комбинаторов). Дадим нестрогое определение адапторам:

Адапторы в контексте асинхронных операций — это методы, определённые на типе Fut1, отображающие (возможно, с дополнительными аргументами, влияющими на поведение) Fut1 в Fut2<..., Fut1, ...>, где Fut1: Future, Fut2: Future.

Путём последовательного вызова адапторов генерируются новые асинхронные операции (форма ленивых вычислений), каждая из которых обладает собственным поведением. Например, метод futures::future::Future::join3 возвращает футуру, вычисляющую результаты трёх переданных асинхронных операций, а futures::future::Future::map возвращает футуру, результатом которой есть значение нового типа, полученное заданным отображением результата переданной футуры.

С данным подходом всё гладко ровно до тех пор, пока компилятор не встретит несовпадение типов. Вот пример вывода компилятора в ответ на ошибку в типах в продолжительной цепочки вызова адапторов:

= note: expected type ()         found type futures::future::and_then::AndThen<futures::stream::concat::Concat2<hyper::body::body::Body>, futures::future::or_else::OrElse<futures::future::map::Map<futures::future::and_then::AndThen<futures::future::and_then::AndThen<futures::future::map_err::MapErr<futures::future::result_::FutureResult<contract::Update, serde_json::error::Error>, [closure@src\main.rs:139:22: 144:14]>, std::result::Result<(contract::User, std::string::String, i64, i64), http::response::Response<hyper::body::body::Body>>, [closure@src\main.rs:145:23: 162:14]>, futures::future::map_err::MapErr<futures::future::and_then::AndThen<impl futures::future::Future, futures::future::either::Either<futures::future::and_then::AndThen<impl futures::future::Future, futures::future::either::Either<futures::future::then::Then<impl futures::future::Future, futures::future::either::Either<impl futures::future::Future, futures::future::result_::FutureResult<(), telegram_client::TelegramClientError>>, [closure@src\main.rs:211:51: 224:46 telegram_client:_, chat_id:_, text:_]>, futures::future::result_::FutureResult<(), telegram_client::TelegramClientError>>, [closure@src\main.rs:173:90: 230:30 file_id:_, ext:_, user:_, message_id:_, dbs:_, chat_id:_, telegram_client:_]>, futures::future::result_::FutureResult<(), telegram_client::TelegramClientError>>, [closure@src\main.rs:166:31: 235:22 user:_, chat_id:_, message_id:_, telegram_client:_, file_id:_, dbs:_]>, [closure@src\main.rs:236:30: 242:22]>, [closure@src\main.rs:163:23: 243:14 telegram_client:_, dbs:_]>, [closure@src\main.rs:245:18: 248:14]>, std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>, fn(http::response::Response<hyper::body::body::Body>) -> std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error> {std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>::Ok}>, [closure@src\main.rs:136:54: 250:6 telegram_client:_, dbs:_]>

[Взято отсюда]

Это происходит, потому что адапторы порождают глубоковложенные обобщённые типы: каждый новый адаптор — это новый уровень вложенности. Компилятору зачастую не остаётся ничего лучше, кроме как вывести их целиком на экран, вместе с типами сгенерированных замыканий, лайфтаймами и полными путями (что и демонстрирует сообщение об ошибке выше).

Проблемы предыдущих подходов решает синтаксис async/.await, силами которого можно писать асинхронный код в синхронном стиле. Асинхронный участок кода (async fn, async { ... } или async move { ... }) компилятором трансформируется в объект, реализующий Future, а fut.await внутри этих участков асинхронно ждёт выполнения fut. Это означает, что в итоговом методе poll код, расположенный после fut.await, выполняться не будет, пока fut не вернёт Poll::Ready(Output). Все приведённые выше реализации одинаковы по смыслу (но итоговый код может разный сгенерироваться). Асинхронный участок кода порождает объект с неявным типом в противовес комбинаторам, которые наслаивают их друг на друга.

Ещё одно неочевидное преимущество async/.await над адапторным/комбинаторным подходом — возможность занимать данные между "вызовами" .await — устраняется нужда в клонировании или использовании сырых (небезопасных) указателей.


Альтернативные взгляды на проблему

Как и с любым нетривиальным решением в дизайне языка, особенно совмещающего системное программирование со сравнительно мощной системой типов, существуют несколько противоположных мнений. Вот некоторые аргументы против async/.await:


async move {… } или async {… } ?

Наличие разницы между async move { ... } и async { ... } может сперва показаться неочевидной, ведь у версии простого блока ({ ... }) не существует move-аналога. Дело в том, что первый асинхронный вариант (с move) овладевает захваченными переменными, а второй вариант (без move) — сначала пытается заимствовать среду иммутабельно, если не получилось — заимствовать мутабельно, если и это не получилось — овладевает (вспомним, что асинхронные блоки преобразуются компилятором в анонимные типы, реализующие трейт Future).

Вот простой пример, демонстрирующий различие. Функция foo() два раза выводит строку с abc на экран:

async fn foo() {     let string = String::from("abc");      async { dbg!(&string); }.await;     async { dbg!(&string); }.await; }

То есть два асинхронных блока заняли string. Если же добавить ключевое слово move после и первого, и второго async, то получим ошибку компиляции, как и следовало ожидать, ведь async move { ... } пытается сразу овладеть переменной:

error[E0382]: use of moved value: `string`  --> src/lib.rs:5:16   | 2 |     let string = String::from("abc");   |         ------ move occurs because `string` has type `std::string::String`, which does not implement the `Copy` trait 3 |      4 |     async move { dbg!(&string); }.await;   |                ------------------   |                |       |   |                |       variable moved due to use in generator   |                value moved here 5 |     async move { dbg!(&string); }.await;   |                ^^^^^^^^------^^^^   |                |       |   |                |       use occurs due to use in generator   |                value used here after move

Рассмотрим ещё один пример:

async fn foo() -> String {     let string = String::from("abc");      async { string }.await }

Здесь асинхронный блок не смог заимствовать string иммутабельно и мутабельно, и совершил свою последнюю (удачную) попытку — овладел ей. Если переписать код с использованием move, то результат останется тем же.

И наконец, рассмотрим пример с мутабельным заимствованием:

async fn foo() {     let mut string = String::from("abc");      async { string.push_str("def"); }.await;      dbg!(string); }

Заимствовать иммутабельно не вышло, ведь сигнатура у String::push_str требует &mut self, значит заимствуем мутабельно — компилируется успешно. Если же присоединить move, то, как вы уже догадались, произойдёт ошибка компиляции, ведь на последней строке функции располжен вызов макроса dbg!, требующего владения string:

error[E0382]: use of moved value: `string`  --> src/lib.rs:6:10   | 2 |     let mut string = String::from("abc");   |         ---------- move occurs because `string` has type `std::string::String`, which does not implement the `Copy` trait 3 |      4 |     async move { string.push_str("def"); }.await;   |                ---------------------------   |                | |   |                | variable moved due to use in generator   |                value moved here 5 |      6 |     dbg!(string);   |          ^^^^^^ value used here after move


Асинхронные замыкания?

Попробуем скомпилировать следующий код:

fn main() {     let closure = async || {         dbg!();     }; }

Вывод:

error[E0658]: async closures are unstable  --> src/main.rs:2:19   | 2 |     let closure = async || {   |                   ^^^^^   |   = note: see issue #62290 <https://github.com/rust-lang/rust/issues/62290> for more information

То есть асинхронные замыкания ещё не стабилизировались (RFC). Не путайте асинхронные замыкания и замыкания, возвращающие Future:

  • Первое — async [move] |...| { ... } (нестабилизированное)
  • Второе — |...| async [move] { ... } (стабилизированное)


Pin

Как вы уже могли заметить, в сигнатуре метода Future::poll присутствует self: Pin<&mut Self>. Что это значит? Это даёт гарантию компилятору, что асинхронная операция не переместится в памяти во время выполнения. Зачем это нужно? Представим ситуацию, где на вход компилятору подаётся такая конструкция (взято отсюда):

async {     let mut x = [0; 128];     let read_into_buf_fut = read_into_buf(&mut x);     read_into_buf_fut.await;     println!("{:?}", x); }

Должно быть сгенерировано следующее:

struct ReadIntoBuf<'a> {     buf: &'a mut [u8], // Указывает на `x` снизу }  struct AsyncFuture {     x: [u8; 128],     read_into_buf_fut: ReadIntoBuf<'self>, }

Примечание

Лайфтайм 'self не разрешён в пользовательском коде, но в целях демонстрации он присутствует в read_into_buf_fut: ReadIntoBuf<'self>. Это означает, что ссылка живёт столько же, сколько и сама структура AsyncFuture.

Если объект AsyncFuture переместится в памяти, то указатель в read_into_buf_fut.buf инвалидируется (станет указывать на неправильное значение), что вызовет UB. Решение простое — запретить перемещение в памяти асинхронных операций во время их выполнения, что и делает self: Pin<&mut Self>.

Pin многие считают сложной темой в Rust, которую стороной обойти не получиться, если вы собираетесь вплотную заниматься асинхронным программированием. Советую прямо сейчас внимательно прочесть соответствующую страницу в документации.


Итог

Future — трейт для асинхронных (конкурентных) вычислений с poll-based кооперативной моделью исполнения. Объекты Future являются программными компонентами первого класса: мы можем с ними обращаться как нам вздумается, а сами по себе они ничего не делают — чтобы их выполнить, необходимо вызывать метод Future::poll до того момента, когда он вернёт Poll::Ready(Output). Обычно этим занимается отдельный модуль, называемый экзекьютором (или планировщиком).

async/.await в Rust — синтаксис для построения анонимных асинхронных операций. Код которых смотрится так, будто выполнен в синхронном стиле.

Ключевое слово async, после которого следуют либо фигурные скобки, либо остальная сигнатура функции, обозначает начало асинхронного блока, т.е. нечто, вычисляющее анонимную асинхронную операцию.

Ключевое слово await, которое синтаксически ведёт себя как поле, асинхронно ожидает футуру fut (fut.await). Остальное содержимое асинхронного блока будет выполнено лишь после завершения fut. Комбинаторный аналог — futures::Future::and_then (futures 0.1.x).


Далее — про то, что позволяет безжизненные футуры превратить в работающий код: про асинхронную среду исполнения Tokio.

Благодарности

За ревью спасибо @blandger, также другим людям за поддержку в русскоязычном Телеграм-чате @rust_async, куда вы тоже можете задавать вопросы, связанные с асинхронным программированием в Rust.

ссылка на оригинал статьи https://habr.com/ru/post/504020/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *