Как подружить Celery и SqlAlchemy 2.0 с асинхронным Python

от автора

Последние полгода я начал задумываться о том, чтобы уходить с любимого Python куда‑нибудь в сторону Rust или Go, потому что, как ни крути, на нём становится писать больновато, когда дело касается каких‑то более «интересных» задач. Со мной, конечно, многие поспорят, но я продолжу смотреть на оборачивание всего, что заблокирует GIL, в различные функции библиотек asyncio или threading, как один большой костыль относительно эстетичного синтаксиса Python.

Недавно, я столкнулся с задачей, когда с проекта на Python нужно было стряхнуть пыли и заставить работать чуточку производительнее. В следствии чего монолит был распилен на микросервисы, а брокером между сервисами стали всем знакомый RabbitMQ и такой же старый как сам Python — Celery. Проект был перенесен с Django на FastAPI, который по-моему субъективному мнению является идеальным решением для любых бэкендов на Python, если мы не говорим о чём-то высоконагруженном, где с питона стоит слезть на другой язык. Вообще, микросервисы это то, что даёт возможность разработать большую часть кодовой базы дёшево, выделив уязвимые места в микросервисы на других языках.

Начнём с конфигурации docker-compose файла:

version: '3.8'  services:   db:     image: postgres:15.1-alpine     env_file:       - ./.env     volumes:       - postgres_data:/var/lib/postgresql/data/    app:     build: ./backend     depends_on:       - db     env_file:       - ./.env     ports:       - "8000:8000"     volumes:       - ./backend/src:/app/    ...    rabbit:     image: rabbitmq:3.11.9-management     hostname: rabbit     environment:       - RABBITMQ_DEFAULT_USER=admin       - RABBITMQ_DEFAULT_PASS=admin       - RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit disk_free_limit 1147483648     volumes:       - ./rabbitmq:/var/lib/rabbitmq    flower:     image: mher/flower     environment:       - CELERY_BROKER_URL=amqp://admin:admin@rabbit:5672//       - RESULT_BACKEND=rpc://       - FLOWER_PORT=5555     ports:       - "5555:5555"     depends_on:       - rabbit    worker:     build: ./backend     command: python -m celery -A celery_app.celery worker --loglevel=info -Q celery --logfile=celery_app/celery.log     volumes:       - ./backend/src:/app/     env_file:       - ./.env     depends_on:       - rabbit     environment:       - C_FORCE_ROOT=yes

Для мониторинга задач Celery использовал опять же всем знакомый и до боли простой Flower. Так же дополнительным аргументом для RabbitMQ использовал disk_free_limit для того чтобы растянуть максимально допустимый под сообщения объем памяти. Заострять внимание на каждом Dockerfile я не буду, потому что ничего специфического там нету. Касаемо конфигурации Celery, тоже ничего сложного нету, мануалов полно в интернете. Так что перейдем сразу в сути проблемы, того, с чем конкретно у меня возникли сложности.

Моя реализация подключения к базе данных через алхимию выглядит следующим образом:

engine = create_async_engine(     DATABASE_URL,     echo=True )  session: async_sessionmaker[AsyncSession] = async_sessionmaker(     engine,     expire_on_commit=False )

К моему разочараванию в Celery так и не появилось ничего нового и интересного. Для того, чтобы использовать асинхронную сессию необходимо использовать асинхронные функции, а значит необходимо обернуть эту функцию во что-то, чтобы celery не ругался.

Первым делом я получил loop в глобальной области моего файла tasks.py, который хранил в себе все таски для Celery (у меня их, если что всего 4). Выглядело это так:

loop = asyncio.get_event_loop()

Так же мою сессию необходимо было обернуть в функцию async_scoped_session, чтобы избежать ошибок связанных с одновременным подключением к сессии нескольких instanc’ов приложений (воркера и самого FastAPI). Выглядела она следующим образом:

@asynccontextmanager async def scoped_session():     scoped_factory = async_scoped_session(         session,         scopefunc=current_task,     )     try:         async with scoped_factory() as s:             yield s     finally:         await scoped_factory.remove()

Важным в этом коде является то, что мы в конце вызываем метод .remove() необходимый для корректного завершения сессии, что применительно только к scoped session (в случае обычной сессии всю работу с открытием и закрытием скрывает под собой контекстный менеджер). Подробнее про это вы можете почитать в документации. После всех проделанных операций теперь мы можем использовать нашу сессию с помощью:

async with scoped_session() as s:     await s.execute(...)

Что касается Celery, т.к. мы не имеем возможности использовать async функции, то нам нужно будет вынести всю асинхронщину в отдельные функции и воспользоваться тем самым loop, лежащим в tasks.py. В таком случае наша таска будет выглядеть примерно таким образом

@shared_task(     bind=True,     name='celery:test' ) def test_task(self, data: dict, prices: dict):   result = loop.run_until_complete(здесь_ваша_асинхнонная_функция(и, аргументы))   return result

После всех проделанных манипуляций, всё завелось и работает корректно и быстро. Если у кого-то есть идеи лучшей реализации — милости прошу в комментарии. Также обращу внимание, что я не считаю свой вариант самым правильным, потому что всегда найдётся человек, знающий в конкретной области больше меня или тебя. Надеюсь на объективную критику и хоть какую-то пользу от написанного. Подписывайтесь, ставьте лайки, всем удачи, всем пока!


ссылка на оригинал статьи https://habr.com/ru/post/721186/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *