Celery + asyncio

от автора

Привет, Хабр! Я хочу рассказать, как я решал проблему эффективного конкурентного исполнения asyncio задач в Celery.

КДПВ

Введение

Celery — большой проект со сложной историей и тяжким бременем обратной совместимости. Ключевые архитектурные решения принимались задолго до появления asyncio в python. Поэтому тем более интересно, что запустить как-нибудь asyncio-задачу можно в celery из коробки.

Зануда mode on

Формально после #839. Субъективно для меня переименование пакета не меняет архитектуру приложения. Зануда mode off.

Запускаем asyncio задачи в ванильном celery

Запустить asyncio задачу можно из коробки:

import asyncio  from .celeryapp import celeryapp   async def async_task():     await asyncio.sleep(42)   @celeryapp.task def regular_task():     coro = async_task()     asyncio.run(coro) 

Очевидные плюсы:

  • Работает же!
  • Просто
  • Нет дополнительных внешних зависимостей

Что, собственно, не так?

  • Event Loop создаётся внутри каждого воркера
  • Переключения контекста между исполняющимися корутинами не происходит
  • В один момент времени воркер исполняет не более одной корутины
  • Общие ресурсы не шарятся между задачами
  • Бойлерплейт

То есть asyncio в данном случае используется ради asyncio, а преимуществ никаких

Попробуем хитрее?

Я боролся за производительность:

import asyncio import threading  from .celeryapp import celeryapp  celeryapp.loop = asyncio.get_event_loop() celeryapp.loop_runner = threading.Thread(     target=celeryapp.loop.run_forever,     daemon=True, ) celeryapp.loop_runner.start()  async def async_task():     await asyncio.sleep(42)   @celeryapp.task def regular_task():     coro = async_task()     asyncio.run_coroutine_threadsafe(         coro=coro,         loop=celeryapp.loop,     ) 

Бинго! Плюсы:

  • Всё ещё работает
  • Даже более-менее эффективно
  • Даже ресурсы шарятся между корутинами в пределах воркера
  • Всё ещё нет дополнительных внешних зависимостей

Бинго? Проблемы не заставили себя ждать:

  • Celery не знает ничего про запущенную корутину
  • Вы теряете контроль над исполнением таски
  • Вы теряете контроль над исключениями
  • Бойлерплейт

Идти с таким чудесным инженерным решением в прод у меня как-то рука не поднялась

Постановка задачи

  • Должно работать
  • Корутины должны исполняться конкурентно
  • Ресурсы должны шариться между множеством исполняемых тасок
  • Никакого бойлерплейта
  • Простой предсказуемый API

То есть, мои ожидания:

import asyncio  from .celeryapp import celeryapp   @celeryapp.task async def async_task():     await asyncio.sleep(42) 

Итоги

Свои идеи я реализовал в библиотеке celery-pool-asyncio. Эта библиотека используется используется нашей командой в текущем проекте, и мы уже выкатились в прод.

Коротко о возможностях

Помимо непосредственно исполнения asyncio задач, celery-pool-asyncio также решает проблемы:

  • Шедулинг асинхронных задач
  • Поддержка корутин в сигналах celery

Для того, чтобы заставить celery-pool-asyncio работать, я использовал monkey patching. Для каждого применяемого в рантайме патча предусмотрена возможность отключения.

Обо всём этом подробнее можно прочитать в документации

Планы

По-хорошему, нужно интегрировать пул в celery. С другой стороны, разработчики celery во всю прототипируют celery 5.0, который будет асинхронным. Стоит ли игра свеч?

Примеры

Для демонстрации возможностей моих библиотек celery-pool-asyncio и celery-decorator-taskcls (статья) был реализован тестовый проект.

Прочее

Я пытался то же самое рассказать на митапе

ссылка на оригинал статьи https://habr.com/ru/post/502380/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *