В этой статье я подготовил для вас прототип CRUD-приложения, которое использует файберы и неблокирующие(асинхронные) возможности драйвера PostreSQL. Вместе они дают любопытные результаты по производительности и потреблению памяти.
Данная статься является продолжением статей:
С ними рекомендуется познакомиться (но совсем необязательно), чтобы лучше понять приведённый код прототипа. Если кратко, то описанный в предыдущих статьях прототип это HTTP-сервер на PHP, который самостоятельно слушает HTTP-порт, принимает HTTP-запросы и формирует ответы, делая это без помощи Nginx, Apache и PHP FPM. Достичь конкурентной обработки запросов ему помогает расширение ev (для отслеживания состояния сетевых сокетов, как более совершенная альтернатива функции socket_select()) и файберы. В данной статье ко всему этому добавилось неблокирующее взаимодействие с сервером PostreSQL.
Код прототипа приведен в репозитории, в ветке aircraft. Приложение имеет два одинаковых CRUD API для сравнения. Оба частично собраны из одних и тех же классов при помощи контейнера внедрения зависимостей. Первый (порт 8085) реализован при помощи HTTP-сервера на базе файберов, в второй (порт 8086) работает на Nginx+PHP FPM. Оба пользуются таблицей aircraft на одном и том же сервере PostgreSQL. Код в репозитории содержит окружение на базе Docker-compose для запуска прототипа.
Конкурентное выполнение запросов
Описываемый файберный HTTP-сервер (порт 8085) содержит только один процесс PHP CLI, как можно убедиться, посмотрев файл docker-compose.yml:
services: php: command: php -f /app/bin/server.php
Для того, чтобы убедиться в конкурентности, по адресу /sleep добавлен контроллер следующего содержания:
class SleepController { private Db $db; public function __construct(Db $db) { $this->db = $db; } public function __invoke(ServerRequestInterface $request, FiberedHandler $fiberedHandler): ResponseInterface { $affectedRows = $this->db->execute($fiberedHandler, 'SELECT pg_sleep(1)'); $rows = $this->db->query($fiberedHandler, 'SELECT gen_random_uuid() AS uuid'); if (count($rows) !== 1 || !isset($rows[0]['uuid'])) { throw new LogicException('Something went wrong.'); } $uuid = $rows[0]['uuid']; return new TextResponse("Hello from Sleep request handler! Affected rows: $affectedRows. Generated uuid: $uuid"); } }
Если обработка запросов осуществлялась бы в последовательном режиме, то общее время ожидания ответа на, скажем, 10 параллельных HTTP-запросов было бы не менее 10 секунд, но представленный одиночный PHP CLI процесс справляется с этим за 2+ секунды. Это легко проверить, воспользовавшись утилитой ab:
$ ab -c 10 -n 10 http://localhost:8085/sleep Concurrency Level: 10 Time taken for tests: 2.029 seconds Complete requests: 10 Failed requests: 0 Total transferred: 1640 bytes HTML transferred: 1040 bytes Requests per second: 4.93 [#/sec] (mean) Time per request: 2028.604 [ms] (mean) Time per request: 202.860 [ms] (mean, across all concurrent requests) Transfer rate: 0.79 [Kbytes/sec] received
Это свидетельствует о том, что весомая часть этих запросов обрабатывается параллельно. Учитывая, что интерпретатор PHP из коробки не поддерживает параллельное выполнение кода, логично сделать вывод, что параллельность pg_sleep(1) здесь обеспечивается сервером PostgreSQL. Однако, сам по себе Postgres представленного результата не обеспечит. Остальную работу делает один постоянно функционирующий процесс PHP, используя файберы, цикл и неблокирующий ввод-вывод.
Сравнение файберного HTTP-сервера и сервера на базе PHP FPM на одном ядре
Каждый из них предоставляет абсолютно одинаковый набор CRUD-операций. В качестве метрики сравнения решено было выбрать время, затраченное на выполнение 4-х операций (создание, чтение, редактирование, удаление) в секунду. Для выполнения тестов используется JMeter. 50 параллельных потоков будут выполнять по 200 запросов каждый, с нулевым интервалом между запросами:
Файл проекта JMeter также можно найти в репозитории.
Как видно из результатов: ~58 крудов в секунду против 16 крудов в секунду на одном ядре.
С помощью cAdvisor, Prometheus и Grafana был также измерен объём памяти, который потреблялся каждой из двух версий во время трёх последовательных запусков:
Таким образом, файберный сервер с пулом соединений к БД потребляет в два раза меньше памяти делая ту же работу в 3 раза быстрее. Всё это делалось на 1 ядре для чистоты эксперимента. Конечно, FPM начнёт выигрывать, если дать ему несколько ядер, однако и для файберного сервера можно запустить несколько процессов за балансировщиком, которые смогут использовать преимущества многоядерных окружений. Так что сравнение на 1 ядре вполне оправдано. Если представленные результаты показались вам интересными, далее предлагаю кратко рассмотреть код прототипа.
Описание кода
Чтобы удобно сделать CRUD, в приложение со времени предыдущих статей были добавлены следующие части:
-
роутинг на базе FastRoute;
-
laminas/laminas-httphandlerrunner для выдачи PSR7 Response в приложении на базе PHP FPM.
-
симфонийский валидатор для валидации содержимого CRUD-запросов;
Для краткости статьи, не буду на них останавливаться, код в репозитории довольно очевиден в части их использования и схож с примерами из документации.
Наибольшего внимания в представленном коде, на мой взгляд, заслуживает слой взаимодействия с БД, а именно файл src/Db/AsyncDb.php. Вот в чём соль:
private function sendQuery( FiberedHandler $fiberedHandler, Connection $connection, Sock $connectionSocket, string $query, array $params ): void { if (!pg_send_query_params($connection, $query, $params)) { throw new DbException('Error while sending the query.'); } $start = time(); $readyTo = null; while (true) { if ($readyTo === FiberedHandler::SOCKET_READY_TO_READ) { App::debugFiber($fiberedHandler, 'Consuming query result.'); if (!pg_consume_input($connection)) { throw new DbException('Error while consuming the query result.'); } } $flush = pg_flush($connection); App::debugFiber($fiberedHandler, 'Flushing resulted with: %d', $flush); if ($flush === false) { throw new DbException('Error while sending the query.'); } elseif ($flush === true) { App::debugFiber($fiberedHandler, 'Flushing has been finished successfully.'); break; } $fiberedHandler->addReadSocket($connectionSocket); $fiberedHandler->addWriteSocket($connectionSocket); $readyTo = $fiberedHandler->suspend(); if ($fiberedHandler->isTimeoutReached($start)) { throw new DbException('Connection timed out while executing a database query.'); } } }
Вместо блокирующих вызовов драйвера PgSQL используются неблокирующие, а если результат ещё не готов, не ждём, а делаем другие, более полезные вещи при помощи файберов. Точнее, когда SQL-запрос отправляется на сервер БД с помощью функции pg_send_query_params() мы уже находимся в контексте файбера (для обработки каждого HTTP-запроса создаётся отдельный файбер) и просто делаем Fiber::suspend(), если отправка ещё не завершена. Приведённый фрагмент кода реализует только отправку запроса на сервер. Получение результата производится схожим способом с помощью функций pg_consume_input(), pg_connection_busy() и pg_get_result(). Эта последовательность рекомендована в документации к драйверу PostgreSQL libpq. Документация драйвера PosgreSQL для PHP не всегда достаточно подробна, но, к счастью, этот драйвер во многом повторяет интерфейс библиотеки libpq, которая снабжена гораздо более подробной документацией.
private function receiveQueryResultAndFetchAllRows( FiberedHandler $fiberedHandler, Connection $connection, Sock $connectionSocket ): array { $result = $this->receiveQueryResult($fiberedHandler, $connection, $connectionSocket); $rows = pg_fetch_all($result); if ($rows === false) { throw new DbException('Error while extracting the data from the query result.'); } if (!pg_free_result($result)) { throw new DbException('Error while freeing the query result.'); } return $rows; } private function receiveQueryResult( FiberedHandler $fiberedHandler, Connection $connection, Sock $connectionSocket ): Result { $start = time(); while (true) { App::debugFiber($fiberedHandler, 'Consuming query result.'); if (!pg_consume_input($connection)) { throw new DbException('Error while consuming the query result.'); } if (!pg_connection_busy($connection)) { App::debugFiber($fiberedHandler, 'The query result is ready. Going to read it immediately.'); break; } App::debugFiber($fiberedHandler, 'The connection is busy. Waiting for the data to read.'); $fiberedHandler->addReadSocket($connectionSocket); $fiberedHandler->suspend(); if ($fiberedHandler->isTimeoutReached($start)) { throw new DbException('Connection timed out while executing a database query.'); } } $result = pg_get_result($connection); if ($result === false) { throw new DbException('Error while getting the query result.'); } $errorMessage = pg_result_error($result); if (!empty($errorMessage)) { $sqlState = pg_result_error_field($result, PGSQL_DIAG_SQLSTATE); if (is_numeric($sqlState)) { $code = (int)$sqlState; } else { $code = 0; } throw new DbException(sprintf('Error while getting the query result: %s %s', $sqlState, $errorMessage), $code); } return $result; }
Эти методы помимо всего прочего требуют передачи объекта соединения с БД. Так как данное приложение это один постоянно живущий процесс, было бы странно создавать и разрывать соединения с БД на каждый HTTP-запрос. По этой причине был реализован пул соединений. Соединение запрашивается из пула перед выполнением SQL-запроса и возвращается в пул после завершения:
public function query(FiberedHandler $fiberedHandler, string $query, array $params = []): array { if (str_contains($query, ';')) { throw new InvalidArgumentException('Multiple queries are unsupported.'); } try { $connection = $this->connectionPool->obtainConnection($fiberedHandler); } catch (DbException $e) { App::debugFiber($fiberedHandler, 'Error getting a database connection instance.'); throw $e; } try { $connectionSocket = $this->extractSocket($connection); $this->sendQuery($fiberedHandler, $connection, $connectionSocket, $query, $params); $rows = $this->receiveQueryResultAndFetchAllRows($fiberedHandler, $connection, $connectionSocket); $this->connectionPool->freeConnection($connection); //DO NOT FORGET TO RETURN THE CONNECTION BACK TO THE POOL!!! } catch (DbException $e) { App::debugFiber($fiberedHandler, 'DbException occurred. Going to close the database connection: %s', $e->getMessage()); $this->connectionPool->closeConnection($connection); throw $e; } return $rows; }
Код самого пула соединений довольно прост, по сути это коллекция соединений с методами доступа:
<?php class ConnectionPool { /** @var Connection[] */ private array $freeConnections = []; /** @var array<int, Connection>*/ private array $obtainedConnections = []; /** * @throws DbException */ public function obtainConnection(FiberedHandler $fiberedHandler): Connection { if (!empty($this->freeConnections)) { /** @var Connection $connection */ $connection = array_shift($this->freeConnections); $connectionStatus = pg_connection_status($connection); App::debugFiber($fiberedHandler, 'A free connection found. Status: %d. Pid: %d', $connectionStatus, pg_get_pid($connection)); if ($connectionStatus !== PGSQL_CONNECTION_OK) { pg_close($connection); App::debugFiber($fiberedHandler, 'Postgresql connection is not ok: %d', $connectionStatus); $connection = $this->initConnection($fiberedHandler); } } else { App::debugFiber($fiberedHandler, 'There is no free connection. Will try to init a new one.'); $connection = $this->initConnection($fiberedHandler); } $this->obtainedConnections[spl_object_id($connection)] = $connection; return $connection; } public function freeConnection(Connection $connection): void { unset($this->obtainedConnections[spl_object_id($connection)]); $this->freeConnections[] = $connection; } public function closeConnection(Connection $connection): void { unset($this->obtainedConnections[spl_object_id($connection)]); pg_close($connection); } /** * @throws DbException */ public function initConnection(FiberedHandler $fiberedHandler): Connection { App::debugFiber($fiberedHandler, 'Postgresql connection initialization started.'); $start = time(); $connection = pg_connect(getenv('PG_CONN_STR'), PGSQL_CONNECT_ASYNC | PGSQL_CONNECT_FORCE_NEW); if ($connection === false) { $lastError = pg_last_error(); App::debugFiber($fiberedHandler, 'Postgresql connect error: %s', $lastError); throw new DbException(sprintf("Postgresql connect error: %s", $lastError)); } while (true) { $connectionStatus = pg_connect_poll($connection); App::debugFiber($fiberedHandler, 'Postgresql connection status: %d', $connectionStatus); if (in_array($connectionStatus, [PGSQL_POLLING_READING, PGSQL_POLLING_WRITING])) { $pgSocket = Sock::fromStream(pg_socket($connection)); //let's see what will happen using this bunch of function calls if ($connectionStatus === PGSQL_POLLING_READING) { $fiberedHandler->addReadSocket($pgSocket); } else { $fiberedHandler->addWriteSocket($pgSocket); } App::debugFiber($fiberedHandler, 'Postgresql connection is not ready yet. Suspending fiber.'); Fiber::suspend(); if ($fiberedHandler->isTimeoutReached($start)) { App::debugFiber($fiberedHandler, 'Connection timed out while connecting to Postgresql. Reason: %s', pg_last_error($connection)); throw new DbException('Connection timed out while connecting to Postgresql.'); } continue; } if ($connectionStatus === PGSQL_POLLING_FAILED) { $lastError = pg_last_error($connection); App::debugFiber($fiberedHandler, 'Postgresql connect error: %s', $lastError); throw new DbException(sprintf("Postgresql connect error: %s", $lastError)); } elseif ($connectionStatus === PGSQL_POLLING_OK) { App::debugFiber($fiberedHandler, 'Postgresql connection initialized. Connection pid: %s', pg_get_pid($connection)); break; } else { App::debugFiber($fiberedHandler, 'Unknown Postgresql connection state: %s', $connectionStatus); throw new DbException('Unknown Postgresql connection state.'); } } return $connection; } }
Как и в случае с выполнением SQL-запросов, подключение к серверу осуществляется неблокирующим способом, с переключением на другие задачи в случае неготовности подключения. Готовность подключения проверяется с помощью вызова функции pg_connect_poll(), цикл и приостановка файбера также на месте. При запросе свободного подключения клиентским кодом, подключение-кандидат проверяется на валидное состояние с помощью pg_connection_status(), при необходимости создаётся новое подключение.
Далее имеет смысл рассмотреть код сервиса, который вызывается из контроллеров, и собственно, реализует CRUD. Прошу заметить, что я знаю, что SQL-запросам не место в сервисном слое, но в прототипе сделано именно так для краткости. Рассмотрим метод, реализующий добавление записи. Как можно видеть, его код не содержит никаких особенностей, намекающих не конкурентное выполнение или использование неблокирующих вызовов, кроме передачи $fiberedHandlerв нижележащий слой.
public function add(AircraftDto $dto, FiberedHandler $fiberedHandler): void { try { $insertedRows = $this->db->execute( $fiberedHandler, 'INSERT INTO aircraft( number, title, serial_number, assembly_date, added_at ) VALUES($1, $2, $3, $4, $5)', [ $dto->number, $dto->title, $dto->serialNumber, $dto->assemblyDate->format('Y-m-d'), $dto->addedAt->format('Y-m-d') ] ); if ($insertedRows !== 1) { throw new DomainException('Error while adding new aircraft.'); } } catch (DbException $e) { if ($e->isDuplicateKey()) { throw new DuplicateEntityException('This aircraft has been already added.', $e->getCode(), $e); } throw new DomainException('Something went wrong.', $e->getCode(), $e); } }
Этот код можно использовать и в неконкурентном приложении, передав заглушку в качестве значения $fiberedHandler, что и делается в PHP FPM части прототипа: сервис, котроллеры и код диспетчера HTTP-запросов на базе FastRoute используются как в файберном сервере, так и в сервере на PHP FPM чтобы сэкономить время и объем прототипа. Остальной код либо описан в предыдущих статьях, либо довольно лёгок для понимания, так что не стану на нём останавливаться.
Выводы
Говоря упрощённо, блокирующие вызовы драйверов БД или блокирующего(синхронного) ввода-вывода под капотом делают то же самое что и неблокирующие(асинхронные) вызовы, но с ожиданием результата. Такое ожидание для каждого блокирующего вызова происходит на стороне ядра, вызывающий процесс при этом вынужден ждать. В представленном примере блокирование выполняется для всех сокетов в приложении вместе с помощью libev (расширения ev) , что даёт возможность сделать свой цикл событий ввода-вывода, а вместо использования системного планировщика для переключения между множеством блокирующихся процессов управляемых FPM — организовать кооперативную многозадачность между файберами. Если не будет ввода-вывода ни на одном из сокетов, то приложение будет заблокировано и ничего не будет делать. Подход «один персональный блокируемый процесс или поток для обработки каждого отдельного HTTP-запроса» не слишком уж эффективен. Процессы и потоки лучше не оставлять простаивать в ожидании, а разработчикам лучше не заниматься выматывающим поиском багов, возникающих из-за проблем с thread-safety в случае использования потоков. Если нужно вычислить что-то объёмное (например, пережать изображение или даже сформировать объёмный HTML или XML, использовать Twig-шаблон), это всегда можно сделать через сервер заданий: отправить такую работу какому-либо вычисляющему воркеру и ожидать результата в неблокирующем режиме, занимаясь обработкой других запросов. Я планирую написать следующую статью об использовании сервера обработки заданий совместно с данным прототипом.
К сожалению, для MySQL реализовать подобное пока не представляется мне возможным. Драйвер mysqli хоть и содержит функции для асинхронного выполнения запросов, но не предоставляет при этом никакого аналога функции pg_socket(), который позволял бы получить доступ к системному сокету соединения с БД. Остаётся надеяться, что в будущем такая функция появится.
Спасибо за чтение и до новых встреч на страницах Хабра;)
Полезные ссылки
ссылка на оригинал статьи https://habr.com/ru/articles/902572/
Добавить комментарий