Введение в hyper: серверная часть

от автора

hyper — это низкоуровневая HTTP-библиотека на Rust. Обычно она служит строительным блоком для более высокоуровневых библиотек, таких как axum, tonic, reqwest и других. Но иногда знание hyper могут требовать в вакансиях.

hyper предоставляет api как для клиента, так и для сервера, но в этой статье мы сосредоточимся исключительно на серверной части.

Первый «Hello, World!»

Создаём новый проект:

cargo new hyper-server

Добавим зависимости в Cargo.toml:

[dependencies] hyper = { version = "1.6.0", features = ["server", "http1"] }      hyper-util = { version = "0.1", features = ["full"] } http-body-util = "0.1.3"   tokio = { version = "1.44.2", features = ["rt-multi-thread", "macros"] }

Что делает каждая библиотека:

  • hyper — основная HTTP-библиотека, включаем фичи поддержки HTTP/1.1-сервера

  • hyper-util — предоставляет утилиты и адаптеры поверх hyper, упрощающие его использование

  • http-body-util — вспомогательная библиотека для работы с HTTP-body

  • tokio — асинхронный рантайм. С версии 1.0 hyper был отвязан от рантайма tokio, поэтому при желании можно использовать альтернативный рантайм

Необходимые импорты:

use hyper::{       Request, Response,       body::{Bytes, Incoming},       server::conn::http1,       service::service_fn,   };   use hyper_util::{Full, rt::TokioIo};   use std::{convert::Infallible, net::SocketAddr};   use tokio::net::TcpListener;

Пояснения:

  • Request — представляет HTTP-запрос

  • Response — представляет HTTP-ответ

  • Bytes — тип для эффективного представления неизменяемых бинарных данных

  • Incoming — асинхронный поток байтов, представляющий тело HTTP-запроса, поступающее от клиента

  • http1 — модуль для работы с HTTP/1.1-соединениями

  • service_fn — функция адаптер, позволяющая создать Service из обычной асинхронной функции

  • Full — вспомогательная обёртка для заранее известного содержимого Body

  • TokioIo — адаптер, который оборачивает tokio::net::TcpStream в тип, совместимый с hyper

  • Infallible — тип ошибки, которая никогда не происходит

  • SocketAddr — структура, представляющая IP-адрес и порт

  • TcpListener— асинхронный TCP-сервер

Обработчик запроса:

async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {       Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))   }

С первого взгляда обработчик может показаться достаточно громоздким:

  • _: Request<Incoming> — входящий HTTP-запрос от клиента. Используем нижнее подчёркивание, чтобы явно показать, что игнорируем содержимое запроса, так как ответ всегда один и тот же

  • Result<Response<Full<Bytes>>, Infallible> — тип возвращаемого значения обработчика:

    • Response<Full<Bytes>> — HTTP-ответ с телом, представленным блок байтов

    • Infallible — обозначает, что наш обработчик не может вернуть ошибку

  • Response::new(Full::new(Bytes::from("Hello, World!"))) — оборачивает строку в байты и преобразует их в тело ответа

Запуск сервера:

#[tokio::main]   async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {       // IP-адрес и порт, на которых будет работать сервер    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));        // Создаём асинхронный TCP-сервер       let listener = TcpListener::bind(addr).await?;          // Бесконечный цикл: принимаем входящие соединения       loop {          // Ждём новое соединение от клиента           let (stream, _) = listener.accept().await?;              // Адаптер, который оборачивает `TcpStream` и делает его совместимым с hyper         let io = TokioIo::new(stream);              // Для каждого соединения создаётся отдельная асинхронная задача           tokio::task::spawn(async move {              // HTTP/1.1 сервер, привязанный к нашему обработчику `hello`               if let Err(err) = http1::Builder::new()                   .serve_connection(io, service_fn(hello))                   .await               {               // обрабатываем возможные ошибки                 eprintln!("Error serving connection: {:?}", err);               }           });       }   }

Запускаем сервер:

cargo run

Проверяем работу сервера:

curl http://127.0.0.1:3000

В данном примере используется модуль http1. Если ван нужен http2, то вы можете посмотреть его реализацию в официальном примере.

Эхо сервер

Новые импорты:

use hyper::{ body::{Body, Frame}, Method, StatusCode } use http_body_util::{Empty, combinators::BoxBody, BodyExt}; 

Пояснения:

  • Body — трейт, описывающий HTTP тело

  • Frame — единица данных в теле HTTP-сообщения. Используется для представления либо части тела (Bytes), либо сигнала конца потока

  • Method — перечисление всех возможных HTTP-методов (GET, POST, PUT, DELETE и т.д.)

  • StatusCode — перечисление стандартных HTTP-статусов (200 OK, 404 Not Found, 500 Internal Server Error и т.д.)

  • Empty — вспомогательная обёртка для пустого содержимого Body

  • BoxBody — обобщённый тип тела ответа. Позволяет вернуть из функции разные типы Body

  • BodyExt — набор удобных методов для работы с телом запроса

Вспомогательные функции для создания полных и пустых тел запросов:

/// Создаёт пустое тело ответа fn empty() -> BoxBody<Bytes, hyper::Error> {       Empty::<Bytes>::new()           .map_err(|never| match never {})           .boxed()   }    /// Оборачивает переданный chunk (например, `&str` или `Vec<u8>`) в тело ответа fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {       Full::new(chunk.into())           .map_err(|never| match never {})           .boxed()   } 

Обработчики:

/// Возвращает клиенту тело запроса без изменений.    fn echo(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {       Ok(Response::new(req.into_body().boxed()))   }      /// Обработчик HTTP-запроса, который возвращает тело запроса,   /// преобразованное в верхний ASCII-регистр   async fn echo_uppercase(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {       // Преобразуем поток тела запроса по частям       let frame_stream = req.into_body().map_frame(|frame| {           let frame = if let Ok(data) = frame.into_data() {               // Преобразуем каждый байт в верхний ASCII-регистр               data.iter()                   .map(|byte| byte.to_ascii_uppercase())                   .collect::<Bytes>()           } else {               Bytes::new()           };              Frame::data(frame)       });          Ok(Response::new(frame_stream.boxed()))   }      /// Обработчик HTTP-запроса, который возвращает тело запроса в перевёрнутом виде   async fn echo_reversed(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {       // Защита от слишком больших тел       let upper = req.body().size_hint().upper().unwrap_or(u64::MAX);       // Если размер тела запроса превышает 64 КБ, возвращает ошибку `413 Payload Too Large`       if upper > 1024 * 64 {           let response = Response::builder()               .status(StatusCode::PAYLOAD_TOO_LARGE)               .body(full("Body too big"))               .unwrap();              return Ok(response);       }          // Читаем всё тело целиком       let whole_body = req.collect().await?.to_bytes();          // Разворачиваем байты в обратном порядке       let reversed_body = whole_body.iter()           .rev()           .cloned()           .collect::<Vec<u8>>();          Ok(Response::new(full(reversed_body)))   }      /// Возвращает ответ `404 Not Found` с пустым телом async fn not_found() -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {       let response = Response::builder()           .status(StatusCode::NOT_FOUND)           .body(empty())           .unwrap();          Ok(response)   }

Как говорилось в начале статьи, hyper — это достаточно низкоуровневая библиотека, и в отличие от axum или actix-web, не предоставляет встроенную маршрутизацию. Поэтому её нужно реализовать вручную:

async fn router(      req: Request,  ) -&gt; Result&gt;, hyper::Error&gt; {  // Сопоставляем HTTP-метод и путь    match (req.method(), req.uri().path()) {          (&amp;Method::POST, "/echo") =&gt; echo(req),          (&amp;Method::POST, "/echo/uppercase") =&gt; echo_uppercase(req).await,          (&amp;Method::POST, "/echo/reversed") =&gt; echo_reversed(req).await,          _ =&gt; not_found().await,      }  } 

В функции main обновляем service_fn :

.serve_connection(io, service_fn(router))

Запускаем сервер и проверяем его работу:

curl -X POST http://127.0.0.1:3000/echo -d "hello world" curl -X POST http://127.0.0.1:3000/echo/uppercase -d "hello world" curl -X POST http://127.0.0.1:3000/echo/reversed -d "hello world"

Middleware

Добавим к текущему эхо серверу middleware, который будет логировать метод и путь каждого HTTP-запроса

Новая зависимость в Cargo.toml:

[dependencies] tower = "0.5.2" # предоставляет абстракции для middleware

Новые импорты:

use hyper::service::Service; use tower::ServiceBuilder;  
  • Service — используется для подключения пользовательской логики к сетевому соединению (например, service_fn(router) создаёт имплементацию этого трейта)

  • ServiceBuilder — позволяет подключить middleware к сервису

Пишем простой middleware для логирования запросов:

/// Простая обёртка, логирующая HTTP-запросы   #[derive(Debug, Clone)]   pub struct Logger<S> {       /// Сервис, к которому делегируется обработка запроса       inner: S,   }      impl<S> Logger<S> {       /// Создание нового логера, оборачивающего другой сервис       pub fn new(inner: S) -> Self {           Logger { inner }       }   }      type Req = Request<Incoming>;      /// Реализация трейта `Service` для `Logger`,   /// позволяющая использовать его как middleware   impl<S> Service<Req> for Logger<S>   where       S: Service<Req>,  // Внутренний сервис должен реализовывать трейт `Service`   {       // Тип ответа будет таким же, как у внутреннего сервиса       type Response = S::Response;       // Тип ошибки - тоже такой же       type Error = S::Error;       // Тип возвращаемого future (обработчика запроса)       type Future = S::Future;          /// Метод, вызываемый при каждом запросе       fn call(&self, req: Req) -> Self::Future {           // Логируем метод и путь запроса           println!("processing request: {} {}", req.method(), req.uri().path());              // Передаём запрос дальше во внутренний сервис           self.inner.call(req)       }   }

Обновляем обработчик в функции main:

tokio::task::spawn(async move {       // Оборачиваем обработчик маршрутов в Service                     let svc = service_fn(router);        // Добавляем наш middleware к обработчику       let svc = ServiceBuilder::new().layer_fn(Logger::new).service(svc);       // Ожидаем завершение соединения (или ошибку)      if let Err(err) = http1::Builder::new().serve_connection(io, svc).await {           eprintln!("server error: {}", err);       }   });

Теперь, при каждом запросе, будут выводиться записи вида:

processing request: POST /echo/reversed processing request: POST /echo 

Для упрощения примера, запросы логируются с помощью println!. Если вам нужен полноценный логер, то ознакомьтесь с крейтом tracing

Gracefully Shutdown

Graceful shutdown — процесс, при котором сервер:

  • перестаёт принимать новые подключения

  • позволяет завершить текущие соединения

  • корректно освобождает ресурсы (файлы, БД, сокеты и т.п.)

Что нужно для реализации:

  • Сигнал завершения

  • Цикл, обрабатывающий входящие соединения

  • Наблюдатель, координирующий завершение соединений

Добавим в tokio новую фичу для обработки сигналов:

[dependencies] tokio = { version = "1.44.2", features = [..., "signal"] } # добавляем фичу для обработки сигналов

Обработка сигнала:

async fn shutdown_signal() {     // Ожидание сигнала CTRL+C      tokio::signal::ctrl_c()         .await         .expect("failed to install CTRL+C signal handler"); }

Обновляем функцию main для отслеживания сигнала отключения и подключаем отслеживание сигнала завершения для соединений:

#[tokio::main]   async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {       // IP-адрес и порт, где будет слушать сервер   let addr = SocketAddr::from(([0, 0, 0, 0], 3000));        // Создаём асинхронный TCP-сервер       let listener = TcpListener::bind(addr).await?;       // Создаём объект для управления "graceful shutdown" (мягкое завершение)       let graceful = hyper_util::server::graceful::GracefulShutdown::new();     // Подключаем сигнал завершения       // После его получения начнётся завершение работы сервера         let mut signal = std::pin::pin!(shutdown_signal());          // Запускаем цикл, чтобы постоянно принимать входящие соединения       loop {           tokio::select! {               // Обработка новых соединений               Ok((stream, _addr)) = listener.accept() => {                   // Адаптер, который оборачивает `TcpStream` и делает его совместимым с hyper                   let io = TokioIo::new(stream);                   // Оборачиваем обработчик маршрутов в Service                                 let svc = service_fn(router);                   // Добавляем наш middleware к обработчику                   let svc = ServiceBuilder::new().layer_fn(Logger::new).service(svc);                   // Создаём соединение HTTP/1.1                   let conn = http1::Builder::new().serve_connection(io, svc);                   // Подключаем отслеживание сигнала завершения для соединения                   let fut = graceful.watch(conn);                     // Для каждого соединения создаётся отдельная асинхронная задача                   tokio::task::spawn(async move {                       // Ожидаем завершение соединения (или ошибку)                       if let Err(err) = fut.await {                           eprintln!("server error: {}", err);                       }                   });               },               // Получен сигнал завершения               _ = &mut signal => {                   // Закрываем listener, чтобы не принимать новые соединения                   drop(listener);                   eprintln!("graceful shutdown signal received");                   // Прерываем основной цикл                   break;               }           }       }          // После выхода из цикла — ждём завершения всех активных соединений       tokio::select! {           // Успешное завершение всех соединений           _ = graceful.shutdown() => {               eprintln!("all connections gracefully closed");           },           // Если соединения не закрылись за 10 секунд — принудительно завершаем           _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {               eprintln!("timed out wait for all connections to close");           }       }          Ok(())   }

Теперь, при сигнале отключения, будет выводиться:

graceful shutdown signal received all connections gracefully closed

Собираем сервер в docker контейнер

Dockerfile

FROM rust:1.86-slim AS builder   RUN apt-get update && apt-get install musl-tools -y && rustup target add x86_64-unknown-linux-musl   WORKDIR /usr/src/app   COPY Cargo.toml Cargo.lock ./   COPY src src   RUN cargo build --target x86_64-unknown-linux-musl --release      FROM scratch   COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/hyper-server /usr/local/bin/hyper-server   EXPOSE 3000   CMD ["hyper-server"]

Сборка контейнера:

docker build -t hyper-server .

Запуск контейнера:

docker run --rm -p 3000:3000 hyper-server

Проверяем работу сервера:

curl -X POST http://127.0.0.1:3000/echo -d "hello world"

Заключение

Если вы хотите продолжить изучение hyper, то ознакомитесь с официальной документацией и примерами.

Это моя первая статья на Хабре, если вы нашли какие-то ошибки или неточности, буду рад уточнениям в комментариях.


ссылка на оригинал статьи https://habr.com/ru/articles/917936/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *