Немного фактов о python asyncio

от автора

Всем привет! Хотелось бы поделиться опытом использования python asyncio. За полтора года использования в продакшене накопился некоторый опыт, общие приемы, облегчающие жизнь. Естественно, были и грабли, о которых также стоит упомянуть, ибо это поможет сэкономить кучу времени тем, кто только начинает использовать в своих приложениях asyncio. Кому интересно — прошу под кат.

Немного истории

Asyncio появился в Pyhton версии 3.4, в 3.5 был добавлен более приятный глазу async/await синтаксис. Asyncio предоставляет из коробки Event loop, Future, Task, Coroutine, I/O multiplexing, Synchronization primitives. Это, конечно, не мало, но для полноценной разработки недостаточно. Для этого есть сторонние библиотеки. Отличная подборка есть вот тут. У себя в компании мы используем asyncio вместе с набором сторонних библиотек для написания микросервисов. По своей природе наши сервисы больше ориентированы на I/O нежели на CPU, так что для нас asyncio отлично подходит.

Собственно факты

Это не учебник по asyncio. Я не буду объяснять, почему асинхронный ввод/вывод это хорошо, или почему бы не использовать потоки. Не будет рассказов о корутинах, генераторах, event loop’ах и т.д. Также тут не будет никаких бенчмарков и сравнений с другими языками. Поехали!

Debug

Во-первых, PYTHONASYNCIODEBUG. Это переменная окружения, которая включает дебаг режим. Например, можно увидеть сообщения о том, что вы объявили функцию как корутину, но вызываете как обычную функцию(актуально для python3.4). Также необходимо настроить asyncio logger на уровень дебаг и еще разрешить вывод ResourseWarning. Можно увидеть много интересного: сообщения о том, что вы забыли закрыть транспорт или сам event loop(читай — забыли освободить ресурсы). Сравните запуск следующего кода с параметром интерпретатора -Wdefault и переменной окружения PYTHONASYNCIODEBUG=1 и без них (здесь и далее в примерах кода я буду опускать некоторые несущественные части такие как import или обработка исключений):

@asyncio.coroutine def test():     pass  loop = asyncio.get_event_loop() test() 

Правильное завершение

Кстати об освобождении ресурсов. Event loop надо уметь правильно остановить, дождавшись корректного заверешения все тасок, закрытия соединений и т.д. И если с использованием run_until_complete() особых проблем нет, то с run_forever() все немного сложнее. Метод close() у event loop’а можно вызвать, только если он уже остановлен — т.е. после метода stop(). Лучше всего это сделать с помощью сигналов:

def handler(loop):     loop.remove_signal_handler(signal.SIGTERM)     loop.stop()  loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGTERM, handler, loop)  try:     loop.run_forever() finally:     loop.close() 

Далее в примерах кода я все же буду концентрироваться на самой сути, а не на правильном завершении программы.

Запуск блокирующего кода

Естественно, не для всего есть асинхронные библиотеки. Некоторый код так и остается блокирующим, и его надо как-то запускать, чтобы он не блокировал наш event loop. Для этого есть хороший метод run_in_executor(), который запускает то, что вы ему передали в одном из потоков встроенного пула, не блокируя основной поток с event loop’ом. Все бы хорошо, но с этим есть 2 проблемы. Во-первых, размер стандартного пула всего 5. Во-вторых, в asyncio синхронный dns resolver, который запускается именно таким образом во встроенном пуле. Значит, за пул всего в 5 потоков будут конкурировать ваши синхронные операции, плюс все кому надо сделать getaddrinfo(). Выход — использовать свой пул. Всегда:

def blocking_function():     time.sleep(42)  pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) loop = asyncio.get_event_loop() loop.run_in_executor(pool, blocking_function()) loop.close() 

Коварные Future

У Future есть одна очень интересная особенность: если в ней произойдет исключение — вы об этом ничего не узнаете, если только явно не спросите об этом у самой future. В документации есть хороший пример на эту тему. Вы увидите, что было исключение, только когда gc будет удалять объект future. Отсюда следует простое правило — всегда проверяете результат вашей future. Даже если по вашей задумке код внутри future должен просто крутиться в бесконечном цикле, и, казалось бы, негде проверять результат — все равно надо обработать исключения, например так:

async def handle_exception():     try:         await bug()     except Exception:         print('TADA!')  async def bug():     raise Exception()  loop = asyncio.get_event_loop() loop.create_task(handle_exception()) loop.run_forever() loop.close() 

await и __init__()

Невозможно. Магический метод __init__() не может содержать асинхронный код. Есть два пути. Или сделать у класса еще один метод, например, initialize(), который уже будет корутиной. Он будет содержать весь асинхронный код для инициализации, и его надо будет вызывать после создания объекта. Выглядит ужасно. Поэтому принято использовать функции-фабрики. Поясню на примере:

class Foo:     def __init__(self, reader, writer, loop, *args, **kwargs):         self._reader = reader         self._writer = writer         self._loop = loop  async def create_foo(loop):     reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop)     return Foo(reader, writer, loop)  loop = asyncio.get_event_loop() foo = loop.run_until_complete(create_foo(loop)) print(foo) loop.close() 

Wake up, Neo

Скажем, у вас есть таска, которая крутится в event loop’е и периодически сбрасывает какой-нибудь буфер. Можно написать такой код:

async def flush_task():     while True:         # flushing...         await asyncio.sleep(FLUSH_TIMEOUT) 

Сделать create_task() — и все вроде бы хорошо, кроме одного: что делать, если по завершении вам необходимо принудительно сбросить содержимое буфера? Как заставить таску «проснусться»? Тут на помощь приходят примитивы синхронизации:

class Foo:      def __init__(self, loop, *args, **kwargs):         self._loop = loop         self._waiter = asyncio.Event()         self._flush_future = self._loop.create_task(self.flush_task())      async def flush_task(self):         while True:             try:                 await asyncio.wait_for(self._waiter.wait(), timeout=FLUSH_TIMEOUT, loop=self._loop)             except asyncio.TimeoutError:                 pass             # flushing ...             self._waiter.clear()      def force_flush():         self._waiter.set()  loop = asyncio.get_event_loop() foo = Foo(loop) loop.run_forever() loop.close() 

Тестирования

Тестировать асинхронный код можно и нужно. И делать это так же просто, как и в случае синхронного кода:

class TestCase(unittest.TestCase):      def setUp(self):         self.loop = asyncio.new_event_loop()         asyncio.set_event_loop(None)      def tearDown(self):         self.loop.close()      def test_001(self):         async def func():             self.assertEqual(42, 42)         self.loop.run_until_complete(func()) 

Тесты отлично изолированы, т.к. в каждом новом тесте используется свой event loop. А можно пойти дальше и использовать pytest, где есть удобные декораторы.

Источники вдохновения

Прежде всего — личный опыт. Многое из перечисленного было осознано в результате «ловли граблей», а затем изучения документации и исходников asyncio. Также отличными примерами послужили исходники популярных библиотек, таких как aiohttp, aioredis, aiopg.

Спасибо всем, кто дочитал статью до конца. Удачи с asyncio!
ссылка на оригинал статьи https://habrahabr.ru/post/314606/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *