Нативный non-blocking I/O через Linux Epoll: создание C-расширений для Python

от автора

Привет, Хабр!

Если вам приходилось писать высоконагруженные сетевые приложения на Python, то вы, скорее всего, сталкивались с тем, что стандартные механизмы работы с вводом‑выводом — select(), poll() и даже asyncio — не справляются с большой нагрузкой. select() быстро превращается в бутылочное горлышко из‑за линейной сложности O(N), poll() всё ещё требует перебора всех файловых дескрипторов, а asyncio, хоть и удобен, но не всегда даёт ту производительность, которую можно получить, если работать напрямую с системными вызовами.

Здесь, на мой взгляд хорошо подойдет epoll — механизм в Linux, который позволяет асинхронно отслеживать события на файловых дескрипторах без постоянного опроса. В отличие от select() и poll(), epoll сообщает процессу о событиях только тогда, когда они происходят, что снижает нагрузку на CPU. Именно поэтому он используется в NGINX, HAProxy и других высоконагруженных системах. Однако, стандартный Python не даёт удобного низкоуровневого интерфейса для работы с epoll, а значит, можно написать собственное C‑расширение, которое позволит вызывать epoll_wait() напрямую.

Создание C-расширения для Python

Создадим файловую структуру проекта:

pyepoll/ │── epollmodule.c    # C-код для работы с epoll │── setup.py         # Сборка C-расширения │── test.py          # Тестирование производительности

В epollmodule.c будет реализован C‑модуль, в setup.py — инструкции для сборки, а test.py позволит проверить работоспособность epoll в Python.

Создадим файл epollmodule.c, в котором реализуем три ключевые функции:

  • py_epoll_create() — создаёт epoll‑инстанс.

  • py_epoll_ctl() — управляет файловыми дескрипторами.

  • py_epoll_wait() — ожидает событий.

Функция epoll_create1() будет создавать новый epoll‑дескриптор. В случае ошибки поднимаем исключение OSError в Python.

#define PY_SSIZE_T_CLEAN #include <Python.h> #include <sys/epoll.h> #include <unistd.h>  static PyObject* py_epoll_create(PyObject* self, PyObject* args) {     int epfd = epoll_create1(0);     if (epfd == -1) {         PyErr_SetFromErrno(PyExc_OSError);         return NULL;     }     return PyLong_FromLong(epfd); }

При вызове epoll.create() в Python эта функция возвращает файловый дескриптор epoll.

Функция epoll_ctl() будет добавлять, изменять и удалять файловые дескрипторы из epoll‑инстанса.

static PyObject* py_epoll_ctl(PyObject* self, PyObject* args) {     int epfd, fd, op, events;     if (!PyArg_ParseTuple(args, "iiii", &epfd, &fd, &op, &events)) {         return NULL;     }      struct epoll_event ev;     ev.events = events;     ev.data.fd = fd;      if (epoll_ctl(epfd, op, fd, &ev) == -1) {         PyErr_SetFromErrno(PyExc_OSError);         return NULL;     }     Py_RETURN_NONE; }

Когда в файловом дескрипторе происходят изменения (например, поступили данные в сокет), epoll_wait() будет возвращать список событий.

static PyObject* py_epoll_wait(PyObject* self, PyObject* args) {     int epfd, maxevents, timeout;     if (!PyArg_ParseTuple(args, "iii", &epfd, &maxevents, &timeout)) {         return NULL;     }      struct epoll_event events[maxevents];     int nfds = epoll_wait(epfd, events, maxevents, timeout);     if (nfds == -1) {         PyErr_SetFromErrno(PyExc_OSError);         return NULL;     }      PyObject* result = PyList_New(nfds);     for (int i = 0; i < nfds; i++) {         PyObject* tuple = Py_BuildValue("(ii)", events[i].data.fd, events[i].events);         PyList_SetItem(result, i, tuple);     }     return result; }

Сборка C-расширения

Создадим setup.py для сборки нашего модуля:

from setuptools import setup, Extension  module = Extension("epoll", sources=["epollmodule.c"])  setup(     name="pyepoll",     version="1.0",     description="Epoll non-blocking I/O для Python",     ext_modules=[module], )

Собираем модуль:

python setup.py build_ext --inplace

Тестирование

Теперь создадим сервер, который обрабатывает соединения с использованием epoll:

import epoll import socket import os  server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(("127.0.0.1", 8000)) server.listen(10) server.setblocking(False)  epfd = epoll.create() epoll.ctl(epfd, server.fileno(), 1, 1)  while True:     events = epoll.wait(epfd, 10, 1000)     for fd, event in events:         if fd == server.fileno():             conn, addr = server.accept()             conn.setblocking(False)             epoll.ctl(epfd, conn.fileno(), 1, 1)         else:             data = os.read(fd, 1024)             if data:                 print("Получено:", data.decode())             else:                 os.close(fd)

Если клиент подключился и отправил «Привет, сервер!», сервер выведет:

Новое соединение: ('127.0.0.1', 54321) Получено: Привет, сервер!

Если клиент отключился, сервер закроет файловый дескриптор:

Новое соединение: ('127.0.0.1', 54321) Закрываем соединение

Если несколько клиентов подключаются одновременно:

Новое соединение: ('127.0.0.1', 54321) Получено: Клиент 1 Новое соединение: ('127.0.0.1', 54322) Получено: Клиент 2

19 февраля в OTUS пройдёт онлайн-лекция на тему «Паттерны системы декомпозиции на микросервисах — как проектировать масштабируемую архитектуру».

Разберём ключевые принципы, обеспечение правильного разделения монолитных приложений на сервисах, особенности организации взаимодействия между микросервисами, а также практики, помогающие избежать распространённых ошибок. Записаться можно на странице онлайн‑курса «Highload Architect».


ссылка на оригинал статьи https://habr.com/ru/articles/881804/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *