Последние несколько лет async вообще и asyncio в частности в питоне все больше набирают популярность и их все чаще используют. При этом иногда забывают о принципе KISS (Keep it simple, stupid) и о том, какие вообще проблемы решает асинхронный код и зачем он нужен. В этой статье я бы хотел описать пример, когда задачу можно и, на мой взгляд, нужно решать без использования async. И вообще, практически без всего.
Рассмотрим задачу.
Для начала подключим к компьютеру (ноутбуку) одну вэб-камеру. Далее нам нужно с этой камеры раз в 10 секунд запрашивать кадр и сохранять его. Для простоты примера будем сохранять кадр на диск. Эта задача простая и решение тоже будет простым, даже примитивным я бы сказал.
import time import cv2 def main(): source = 0 frame_file_path = '<path to a frame file>.jpg' cap = cv2.VideoCapture(source) while True: ret, frame = cap.read() if not ret: print('Failed to get a frame') break cv2.imwrite(frame_file_path, frame) time.sleep(10) cap.release() if __name__ == '__main__': main()
Чем хорош данный скрипт? Тем, что он прост, понятен и надежен как лом и не требует никаких внешних инструментов типа cron-а.
Теперь немного масштабируем наш пример: добавим еще одну камеру и скажем, что с нее нам нужны кадры каждые 15 секунд. Если на проекте вы активно пользуетесь asyncio, то можете по инерции поступить следующим образом: сказать, что раз задача по существу I/O Bound, а камер стало больше, то это работа для asyncio. Затем взять какой-нибудь планировщик задач типа APScheduler, который поддерживает asyncio и с его помощью и помощью лома накидать что-то вроде такого.
import asyncio from functools import partial import cv2 from apscheduler.schedulers.asyncio import AsyncIOScheduler async def job(source, target_file_path): cap = cv2.VideoCapture(source) ret, frame = await asyncio.get_event_loop().run_in_executor(None, partial(cap.read, cap)) if not ret: print('Failed to get a frame') return await asyncio.get_event_loop().run_in_executor(None, partial(cv2.imwrite, target_file_path, frame)) def main(): frame_0_file_path = '' frame_1_file_path = '' scheduler = AsyncIOScheduler() scheduler.add_job(job, 'interval', seconds=10, kwargs={'source': 0, 'target_file_path': frame_0_file_path}) scheduler.add_job(job, 'interval', seconds=15, kwargs={'source': 1, 'target_file_path': frame_1_file_path}) scheduler.start() try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass if __name__ == '__main__': main()
На первый взгляд все неплохо, но это на первый взгляд. Помимо дополнительной внешней зависимости мы собрали под капотом корутины, asyncio с ивент лупом и потоки для запуска синхронных функций асинхронным образом. И теперь вместо топорной и надежной программки у нас есть целый технический букет, с которым можно повозиться и от этого кайфануть. Но оно нам надо? Мне вот как инженеру не очень 🙂 Если вспомнить, что async, ивент луп и вот это все призваны при написании I/O Bound приложений экономить ресурсы при большой нагрузке, а нагрузки у нас нет, да и камер будет конечное и вполне осмысливаемое число, то могут закрасться смутные сомнения. А нужен ли нам там asyncio тут вообще? И можно прийти к выводу, что не нужен и что вместо запуска асинхронных задач на ивент лупе, можно воспользоваться классом apscheduler.executors.pool.ProcessPoolExecutor, который позволит нам запускать наши джобы в процессах. Есть еще класс, который позволяет нам запускать джобы в потоках. Но сомнения у нас смутные, поэтому мы его тоже отбросим. Получится что-то вроде такого:
import cv2 from apscheduler.schedulers.blocking import BlockingScheduler def job(source, target_file_path): cap = cv2.VideoCapture(source) ret, frame = cap.read() if not ret: print('Failed to get a frame') return cv2.imwrite(target_file_path, frame) def main(): frame_0_file_path = '' frame_1_file_path = '' scheduler = BlockingScheduler({ 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ProcessPoolExecutor', 'max_workers': 2 } }) scheduler.add_job(job, 'interval', seconds=10, kwargs={'source': 0, 'target_file_path': frame_0_file_path}) scheduler.add_job(job, 'interval', seconds=15, kwargs={'source': 1, 'target_file_path': frame_1_file_path}) scheduler.start() if __name__ == '__main__': main()
Так намного лучше: нет потоков, нет asyncio с ивент лупом не по делу. Но все еще не идеал, ибо есть APScheduler с какими-то внутренним механизмами. Оно нам надо? Мне — нет. Наше решение для одной камеры было максимально простым, тупым, надежным и имело всего одну зависимость от которой, увы, не убежишь. Почему бы нам тогда не вспомнить про принцип KISS еще разок и не взять первую версию программы, не параметризовать ее и не запускать ее для каждой камеры? Этот подход мне лично нравится больше всего: минимум усилий (если подумать о нем сразу), максимум надежности (я как инженер люблю все надежное) и максимально просто сопровождение, а точнее его отсутствие 🙂
В итоге получаем следующее:
import argparse import time import cv2 def main(source: int, frame_file_path: str, interval: int): cap = cv2.VideoCapture(source) while True: ret, frame = cap.read() if not ret: print('Failed to get a frame') break cv2.imwrite(frame_file_path, frame) time.sleep(interval) cap.release() if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--source', type=int) parser.add_argument('--target-frame-path', type=str, dest='target_frame_path') parser.add_argument('--interval', type=int) args = parser.parse_args() main(args.source, args.target_frame_path, args.interval)
Такое же простое, надежное и понятное решение как лом. А против лома, как известно, приема нет 🙂
ссылка на оригинал статьи https://habr.com/ru/post/669690/
Добавить комментарий