sched – планировщик событий из Python
Модуль sched входит в состав стандартной библиотеки Python и обеспечивает простой механизм для планирования событий в программе. Этот модуль может работать в приложении на FastAPI, но пользоваться им не рекомендуется, так как он слишком прост, и функциональность его ограничена.
Вот как при помощи sched можно запланировать задачу в приложении на FastAPI:
import sched import time scheduler = sched.scheduler(time.time, time.sleep) def run_me_every_minute(): print("Running every minute...") def schedule_next_event(): scheduler.enter(60, 1, run_me_every_minute) scheduler.run() scheduler.enter(60, 1, run_me_every_minute) while True: schedule_next_event()
В вышеприведённом коде мы создаём объект scheduler при помощи модуля sched и определяем функцию run_me_every_minute так, чтобы она работала как запланированная задача. Затем применяем метод enter, чтобы он назначал задачу, и она начинала выполняться через 60 секунд, а при помощи метода run запускаем планировщик.
Конечно, sched прост и лёгок в использовании, но в нём не хватает некоторых продвинутых возможностей, имеющихся в других библиотеках для планирования задач.
Пакет schedule для python
Пакет schedule (см. Github, см. документация) позиционируется как «планировщик задач Python, сделанный для людей». Он предоставляет мощные и гибкие способы планировать задачи в Python. Им удобно пользоваться и у него простой API.
На следующем примере показано, как при помощи schedule запланировать задачу в приложении на FastAPI:
import schedule import time def run_me_every_minute(): print("Running every minute...") schedule.every(1).minutes.do(run_me_every_minute) while True: schedule.run_pending() time.sleep(1)
В вышеприведённом коде мы определяем функцию run_me_every_minute так, чтобы она выполнялась как запланированная задача, а при помощи метода every задаём выполнение этой задачи раз в минуту. Далее при помощи метода run_pending мы проверяем, какие задачи назначены, а при помощи метода sleep устанавливаем задержку цикла на 1 секунду.
Притом, что schedule прост в использовании, он ещё и предоставляет многие продвинутые возможности — в частности, обеспечивает обработку ошибок и подбор интервалов. Тем не менее, он может работать только в однопоточном режиме и для высокопроизводительных приложений не подойдёт.
Использование повторяющихся задач из fastapi-utils
Пакет fastapi-utils (см. Github, см. документация) позволяет просто и с удобством назначать повторяющиеся задачи в приложении, написанном на FastAPI. Для обработки асинхронных задач в нём используется библиотека asyncio.
Ниже показано, как при помощи fastapi-utils назначить задачу в приложении на FastAPI:
from fastapi import FastAPI from fastapi_utils.tasks import repeat_every app = FastAPI() @repeat_every(seconds=60) async def run_me_every_minute(): print("Running every minute...") if __name__ == "__main__": import uvicorn uvicorn.run(app)
В вышеприведённом коде при помощи декоратора repeat_every мы назначаем выполнение задачи с периодичностью в 60 секунд. Далее мы определяем функцию run_me_every_minute так, чтобы она выполнялась как назначенная задача.
Пакет fastapi-utils прост в использовании и хорошо интегрируется с FastAPI, но, возможно, не подойдёт для сравнительно сложных задач, связанных с планированием.
Использование пакета arq
Пакет arq (см. Github, см. документация) — ещё один мощный и гибкий инструмент для назначения задач в приложениях на FastAPI. Он выстроен на основе библиотеки asyncio и поддерживает продвинутые возможности: в частности, позволяет присваивать задачам приоритет и формулировать стратегии повторных попыток.
Вот как при помощи arq назначить задачу в приложении на FastAPI:
from fastapi import FastAPI from arq import create_pool from arq.jobs import Job app = FastAPI() async def run_me_every_minute(): print("Running every minute...") @app.on_event("startup") async def startup(): redis_settings = {"host": "localhost", "port": 6379} pool = await create_pool(redis_settings) job = Job(run_me_every_minute) await pool.enqueue_job(job) if __name__ == "__main__": import uvicorn uvicorn.run(app)
В вышеприведённом коде мы при помощи функции create_pool создаём пул соединений с Redis, который arq затем использует для хранения и назначения задач. Мы определяем функцию run_me_every_minute так, чтобы она выполнялась как назначенная задача, а также инкапсулируем её в объект Job, который специально для этого создаём. Наконец, применяем метод enqueue_job, чтобы назначить эту задачу на выполнение.
arq — мощная и гибкая библиотека для планирования, справляющаяся со сложными задачами. Но с её настройкой и конфигурацией приходится поработать серьёзнее, чем в случае с некоторыми другими библиотеками.
Работа с celery
Celery — это популярный инструмент для работы с очередью распределённых задач, и его удобно применять при работе с приложением на FastAPI. В Celery поддерживаются различные продвинутые возможности, в частности, расстановка приоритетов задач, стратегии повторных попыток и хранилище для результатов задач. Подробнее о периодических задачах рассказано в документации по Celery.
Вот как при помощи Celery можно запланировать задачу в приложении на FastAPI:
from fastapi import FastAPI from celery import Celery app = FastAPI() celery = Celery('tasks', broker='pyamqp://guest@localhost//') @celery.task def run_me_every_minute(): print("Running every minute...") @app.on_event("startup") async def startup(): celery.conf.beat_schedule = { "run-every-minute": { "task": "main.run_me_every_minute", "schedule": crontab(minute="*") } } celery.conf.timezone = "UTC" celery.conf.task_routes = {"main.run_me_every_minute": {"queue": "default"}} celery.autodiscover_tasks(["main"]) # Other Celery config if __name__ == "__main__": import uvicorn uvicorn.run(app)
В вышеприведённом коде при помощи метода celery.schedules.crontab мы назначаем для Celery дополнительные конфигурационные настройки, так, чтобы задача run_me_every_minute выполнялась каждую минуту. Также мы определим для Celery конфигурационные настройки, в которых указывается часовой пояс и настройки маршрутизации задач.
Celery — это мощная и насыщенная возможностями библиотека для планирования задач, справляющаяся с очень сложными сценариями, но достаточно сложная в настройке и конфигурировании.
Работа с Dramatiq
Dramatiq — это высокопроизводительная библиотека для обработки распределённых задач, применимая с приложениями на FastAPI. В ней также поддерживаются такие возможности как расстановка приоритетов задач, стратегии повторных попыток и хранилище для результатов.
Вот как при помощи Dramatiq можно запланировать задачу в приложении на FastAPI:
from fastapi import FastAPI from dramatiq import pipeline, actor, run_pipeline, cron app = FastAPI() @actor(cron("*/1 * * * *")) def run_me_every_minute(): print("Running every minute...") @app.on_event("startup") async def startup(): job = pipeline(run_me_every_minute.message()) run_pipeline(job) if __name__ == "__main__": import uvicorn uvicorn.run(app)
В вышеприведённом коде мы применяем метод dramatiq.cron, чтобы назначить ежеминутное выполнение задачи run_me_every_minute. Задача cron назначает для выполнения при помощи выражения */1 * * * *, означающего, что мы мы создаём конвейер, а затем задействуем методы pipeline и run_pipeline для назначения и выполнения задачи.
APScheduler
Библиотека APScheduler также очень мощная и пользуется популярностью при планировании задач на Python. В ней предусмотрено много продвинутых возможностей, в частности, планирование в стиле cron, интервальное планирование и многое другое.
Вот как при помощи APScheduler можно назначить, чтобы выбранная задача ежеминутно выполнялась в приложении на FastAPI:
from fastapi import FastAPI from apscheduler.schedulers.asyncio import AsyncIOScheduler app = FastAPI() scheduler = AsyncIOScheduler() async def run_me_every_minute(): print("Running every minute...") @app.on_event("startup") async def startup(): scheduler.add_job(run_me_every_minute, "interval", minutes=1) scheduler.start() if __name__ == "__main__": import uvicorn uvicorn.run(app)
В вышеприведённом коде мы задаём при помощи библиотеки APScheduler, чтобы задача run_me_every_minute выполнялась ежеминутно. Мы определяем объект планировщика, а далее при помощи метода add_job добавляем задачу в планировщик. Далее для запуска планировщика вызывается метод start.
Обратите внимание: асинхронный планировщик создаётся при помощи класса AsyncIOScheduler. Он необходим при использовании APScheduler с FastAPI, поскольку FastAPI – это асинхронный фреймворк.
В целом APScheduler — это мощная библиотека для планирования, в которой предоставляется множество продвинутых возможностей. Она совместима с FastAPI и может использоваться при планировании задач, когда они должны выполняться с разными интервалами.
Заключение
При работе с приложениями, написанными на Python с применением FastAPI есть множество инструментов для планирования задач, каждый со своими достоинствами и недостатками. Подходящая библиотека для планирования выбирается в зависимости от сложности вашего приложения и от того, какие именно возможности вам требуются.
Если вам нужна простая и удобная в использовании библиотека для планирования, то вам хорошо подойдут варианты schedule или fastapi-utils. Если вам требуются более продвинутые возможности и более значительная гибкость при работе, попробуйте arq, Celery или Dramatiq.
Независимо от того, на какой библиотеке вы остановитесь, можно значительно усовершенствовать функциональность и эффективность вашего приложения на FastAPI, если настроить в нём планирование задач. Надеюсь, что приведённые в этой статье примеры кода станут для вас хорошей отправной точкой, чтобы реализовать планирование задач в ваших проектах.
ссылка на оригинал статьи https://habr.com/ru/articles/920194/
Добавить комментарий