Всем привет! Хотелось бы поделиться опытом использования python asyncio. За полтора года использования в продакшене накопился некоторый опыт, общие приемы, облегчающие жизнь. Естественно, были и грабли, о которых также стоит упомянуть, ибо это поможет сэкономить кучу времени тем, кто только начинает использовать в своих приложениях asyncio. Кому интересно — прошу под кат.
Немного истории
Asyncio появился в Pyhton версии 3.4, в 3.5 был добавлен более приятный глазу async/await синтаксис. Asyncio предоставляет из коробки Event loop, Future, Task, Coroutine, I/O multiplexing, Synchronization primitives. Это, конечно, не мало, но для полноценной разработки недостаточно. Для этого есть сторонние библиотеки. Отличная подборка есть вот тут. У себя в компании мы используем asyncio вместе с набором сторонних библиотек для написания микросервисов. По своей природе наши сервисы больше ориентированы на I/O нежели на CPU, так что для нас asyncio отлично подходит.
Собственно факты
Это не учебник по asyncio. Я не буду объяснять, почему асинхронный ввод/вывод это хорошо, или почему бы не использовать потоки. Не будет рассказов о корутинах, генераторах, event loop’ах и т.д. Также тут не будет никаких бенчмарков и сравнений с другими языками. Поехали!
Debug
Во-первых, PYTHONASYNCIODEBUG. Это переменная окружения, которая включает дебаг режим. Например, можно увидеть сообщения о том, что вы объявили функцию как корутину, но вызываете как обычную функцию(актуально для python3.4). Также необходимо настроить asyncio logger на уровень дебаг и еще разрешить вывод ResourseWarning. Можно увидеть много интересного: сообщения о том, что вы забыли закрыть транспорт или сам event loop(читай — забыли освободить ресурсы). Сравните запуск следующего кода с параметром интерпретатора -Wdefault и переменной окружения PYTHONASYNCIODEBUG=1 и без них (здесь и далее в примерах кода я буду опускать некоторые несущественные части такие как import или обработка исключений):
@asyncio.coroutine def test(): pass loop = asyncio.get_event_loop() test()
Правильное завершение
Кстати об освобождении ресурсов. Event loop надо уметь правильно остановить, дождавшись корректного заверешения все тасок, закрытия соединений и т.д. И если с использованием run_until_complete() особых проблем нет, то с run_forever() все немного сложнее. Метод close() у event loop’а можно вызвать, только если он уже остановлен — т.е. после метода stop(). Лучше всего это сделать с помощью сигналов:
def handler(loop): loop.remove_signal_handler(signal.SIGTERM) loop.stop() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGTERM, handler, loop) try: loop.run_forever() finally: loop.close()
Далее в примерах кода я все же буду концентрироваться на самой сути, а не на правильном завершении программы.
Запуск блокирующего кода
Естественно, не для всего есть асинхронные библиотеки. Некоторый код так и остается блокирующим, и его надо как-то запускать, чтобы он не блокировал наш event loop. Для этого есть хороший метод run_in_executor(), который запускает то, что вы ему передали в одном из потоков встроенного пула, не блокируя основной поток с event loop’ом. Все бы хорошо, но с этим есть 2 проблемы. Во-первых, размер стандартного пула всего 5. Во-вторых, в asyncio синхронный dns resolver, который запускается именно таким образом во встроенном пуле. Значит, за пул всего в 5 потоков будут конкурировать ваши синхронные операции, плюс все кому надо сделать getaddrinfo(). Выход — использовать свой пул. Всегда:
def blocking_function(): time.sleep(42) pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) loop = asyncio.get_event_loop() loop.run_in_executor(pool, blocking_function()) loop.close()
Коварные Future
У Future есть одна очень интересная особенность: если в ней произойдет исключение — вы об этом ничего не узнаете, если только явно не спросите об этом у самой future. В документации есть хороший пример на эту тему. Вы увидите, что было исключение, только когда gc будет удалять объект future. Отсюда следует простое правило — всегда проверяете результат вашей future. Даже если по вашей задумке код внутри future должен просто крутиться в бесконечном цикле, и, казалось бы, негде проверять результат — все равно надо обработать исключения, например так:
async def handle_exception(): try: await bug() except Exception: print('TADA!') async def bug(): raise Exception() loop = asyncio.get_event_loop() loop.create_task(handle_exception()) loop.run_forever() loop.close()
await и __init__()
Невозможно. Магический метод __init__() не может содержать асинхронный код. Есть два пути. Или сделать у класса еще один метод, например, initialize(), который уже будет корутиной. Он будет содержать весь асинхронный код для инициализации, и его надо будет вызывать после создания объекта. Выглядит ужасно. Поэтому принято использовать функции-фабрики. Поясню на примере:
class Foo: def __init__(self, reader, writer, loop, *args, **kwargs): self._reader = reader self._writer = writer self._loop = loop async def create_foo(loop): reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop) return Foo(reader, writer, loop) loop = asyncio.get_event_loop() foo = loop.run_until_complete(create_foo(loop)) print(foo) loop.close()
Wake up, Neo
Скажем, у вас есть таска, которая крутится в event loop’е и периодически сбрасывает какой-нибудь буфер. Можно написать такой код:
async def flush_task(): while True: # flushing... await asyncio.sleep(FLUSH_TIMEOUT)
Сделать create_task() — и все вроде бы хорошо, кроме одного: что делать, если по завершении вам необходимо принудительно сбросить содержимое буфера? Как заставить таску «проснусться»? Тут на помощь приходят примитивы синхронизации:
class Foo: def __init__(self, loop, *args, **kwargs): self._loop = loop self._waiter = asyncio.Event() self._flush_future = self._loop.create_task(self.flush_task()) async def flush_task(self): while True: try: await asyncio.wait_for(self._waiter.wait(), timeout=FLUSH_TIMEOUT, loop=self._loop) except asyncio.TimeoutError: pass # flushing ... self._waiter.clear() def force_flush(): self._waiter.set() loop = asyncio.get_event_loop() foo = Foo(loop) loop.run_forever() loop.close()
Тестирования
Тестировать асинхронный код можно и нужно. И делать это так же просто, как и в случае синхронного кода:
class TestCase(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) def tearDown(self): self.loop.close() def test_001(self): async def func(): self.assertEqual(42, 42) self.loop.run_until_complete(func())
Тесты отлично изолированы, т.к. в каждом новом тесте используется свой event loop. А можно пойти дальше и использовать pytest, где есть удобные декораторы.
Источники вдохновения
Прежде всего — личный опыт. Многое из перечисленного было осознано в результате «ловли граблей», а затем изучения документации и исходников asyncio. Также отличными примерами послужили исходники популярных библиотек, таких как aiohttp, aioredis, aiopg.
Спасибо всем, кто дочитал статью до конца. Удачи с asyncio!
ссылка на оригинал статьи https://habrahabr.ru/post/314606/
Добавить комментарий