Асинхронный загрузчик видео на aiogram 3 и yt-dlp: как не положить Event Loop и прикрутить честный прогресс-бар

от автора

бот для скачивания видео с ютуб, телеграм, тикток

бот для скачивания видео с ютуб, телеграм, тикток

Зачем в 2026 году писать ещё один загрузчик видео

Казалось бы, тема заезженная до состояния покрышки камаза: «сделай бота, который качает ролики». На Хабре уже лежит десяток туториалов, на GitHub — сотни форков. Но если вы хоть раз открывали такой бот в проде — вы знаете, что 90% из них падают на втором пользователе, а оставшиеся 10% честно умирают на видео длиннее 15 минут.

Я полез в органическую выдачу Google, чтобы понять, почему люди вообще ищут Telegram-ботов, а не пользуются веб-сервисами. И выдача, честно говоря, удручает.

Мини-аналитика поисковой выдачи (или почему люди идут в Telegram)

Когда я из любопытства пошёл смотреть, что на самом деле гуглят живые люди вокруг темы «скачать видео с ютуба», картина сложилась довольно предсказуемая. Базовый запрос — это банальное «скачать видео с YouTube по ссылке», с вариациями раскладки и опечаток уровня «ютуб com». Дальше начинается сегментация по качеству: кому-то обязательно нужно 1080p (причём половина пишет «1080р» кириллицей, даже не замечая этого), а кому-то — голый MP4-файл без лишних обвесов.

Вторая ось — способ получения. Человек хочет бесплатно, онлайн и желательно без регистрации (в 2026-м форма регистрации пугает пользователя сильнее, чем баннер онлайн-казино, и я его понимаю). Именно под этот интент заточены первые десять строк выдачи: снаружи — аккуратный лендинг с большой кнопкой, внутри — три редиректа, попап-блокировщик, инсталлер Yandex.Browser Setup.exe и, если повезёт, бесплатный майнер в подарок. Суть всех этих запросов, как ни крути их семантически, сводится к одному — нажать одну кнопку и получить MP4 в хорошем качестве. Всё остальное — уже наши с вами инженерные проблемы. Потому что пользовоатель не хочет:

  • смотреть рекламу казино «Пин-Ап»;

  • ставить «плагин для браузера»;

  • регистрироваться, подтверждать почту и проходить капчу с пожарными гидрантами;

  • объяснять антивирусу, что setup_downloader_pro_final2.exe — это якобы легитимно.

Именно поэтому Telegram-бот выигрывает у веб-сервисов как концепт: нет рекламы, нет регистрации, нет странных exe-шников. Прислал ссылку — получил файл. Всё.

Проблема одна: написать такой бот правильно — сложнее, чем кажется. Об этом и статья.


Стек: почему Python + aiogram 3 + yt-dlp

Коротко и по делу, без очередного сравнения с Go и Node.

  • Python 3.11+ — потому что asyncio в 11-й ветке стал действительно быстрым, TaskGroup приехал, а tomllib встроен.

  • aiogram 3.x — современный роутер, Dispatcher на DI, FSM из коробки, полноценный Type Hinting, middlewares, фильтры. Это не python-telegram-bot в его классическом перегруженном виде и не telebot образца 2018-го.

  • yt-dlp — форк youtube-dl, который живее всех живых. Поддерживает не только YouTube, но и около 1500 сайтов (TikTok, VK, Instagram, X/Twitter, Vimeo, Rutube и так далее). Парсеры обновляются чаще, чем я коммичу.

Архитектурно задача выглядит тривиально: принял сообщение → выдрал ссылку → вызвал yt-dlp → отправил файл. И вот здесь начинается самое интересное.


Главная проблема: блокирующий IO и смерть Event Loop

Новичок, написавший первого бота на aiogram, делает примерно так:

python

@router.message(F.text.startswith("http"))async def download_handler(message: Message) -> None:    url = message.text    with YoutubeDL({"outtmpl": "video.mp4"}) as ydl:        ydl.download([url])  # 🔥 тут всё и умирает    await message.answer_video(FSInputFile("video.mp4"))

На одном пользователе это даже работает. На втором — бот превращается в тыкву.

Почему так происходит

asyncio работает в одном потоке — это кооперативная многозадачность. Event Loop жонглирует корутинами, переключаясь между ними на await. Пока одна корутина не встретила await или не завершилась — остальные ждут.

yt-dlpполностью синхронная библиотека. Под капотом она:

  1. Ходит HTTP-запросами через urllib (блокирующий сокет).

  2. Парсит страницы, извлекает URL стримов.

  3. Скачивает чанки через блокирующие операции IO.

  4. Вызывает ffmpeg через subprocess — это тоже блок.

Когда вы вызываете ydl.download([url]) в async def-хендлере, вы буквально замораживаете Event Loop на всё время скачивания. Представьте: пользователь А прислал ссылку на 2-часовой стрим в 4K. На 2 часа ваш бот перестаёт отвечать абсолютно всем — кнопки не нажимаются, /start игнорируется, middlewares молчат. И нет, GIL здесь не поможет: проблема даже не в нём, а в том, что у вас один поток для всей асинхронной машины.

Многие наивно думают: «ну так GIL же всё равно отпускается на IO, проблем быть не должно». Проблема есть — GIL отпускается на уровне интерпретатора CPython, но asyncio про это ничего не знает. Для Event Loop любой синхронный вызов — это чёрный ящик, из которого не возвращается управление.

Вывод: синхронный код в async-хендлере = DoS самого себя.


Архитектурное решение: выносим работу в поток

Решений у проблемы три:

  1. asyncio.to_thread() — сахар поверх run_in_executor с дефолтным пулом. Pythonic, ленив, подходит для 95% случаев.

  2. Кастомный ThreadPoolExecutor — когда нужен контроль над количеством воркеров и очередью.

  3. ProcessPoolExecutor — когда есть реальный CPU-bound (нам не подходит, yt-dlp — это IO-bound).

Я беру вариант №2 — кастомный пул. Потому что если вы пустите всё в дефолтный пул через to_thread, то при 50 параллельных запросах у вас улетит в небеса и память, и сеть. Нужен rate limiting на уровне архитектуры.

Сервисный слой DownloaderService

python

from __future__ import annotationsimport asyncioimport loggingfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclassfrom pathlib import Pathfrom typing import Any, Callablefrom yt_dlp import YoutubeDLfrom yt_dlp.utils import DownloadErrorlogger = logging.getLogger(__name__)@dataclass(slots=True, frozen=True)class DownloadResult:    file_path: Path    title: str    duration: int    filesize: intclass DownloaderService:    """    Сервис скачивания видео. Всю блокирующую работу выносит    в отдельный ThreadPoolExecutor, чтобы не мешать Event Loop.    """    def __init__(        self,        download_dir: Path,        max_workers: int = 4,    ) -> None:        self._download_dir = download_dir        self._download_dir.mkdir(parents=True, exist_ok=True)        # Ограничиваем количество одновременных скачиваний.        # Больше 4-8 смысла не имеет: упрёмся в сеть/диск/RAM.        self._executor = ThreadPoolExecutor(            max_workers=max_workers,            thread_name_prefix="ytdlp-worker",        )    async def download(        self,        url: str,        progress_hook: Callable[[dict[str, Any]], None] | None = None,    ) -> DownloadResult:        """        Асинхронная обёртка. Внутри — честный синхронный yt-dlp,        но исполняется в отдельном потоке.        """        loop = asyncio.get_running_loop()        return await loop.run_in_executor(            self._executor,            self._blocking_download,            url,            progress_hook,        )    def _blocking_download(        self,        url: str,        progress_hook: Callable[[dict[str, Any]], None] | None,    ) -> DownloadResult:        """Этот метод выполняется ВНЕ event loop, в рабочем потоке."""        ydl_opts: dict[str, Any] = {            # Лучшее mp4 до 1080p + лучший m4a, смёрженные в mp4.            "format": "bv*[height<=1080][ext=mp4]+ba[ext=m4a]/b[ext=mp4]/b",            "merge_output_format": "mp4",            "outtmpl": str(self._download_dir / "%(id)s.%(ext)s"),            "quiet": True,            "no_warnings": True,            "noprogress": True,  # свой хук, стандартный stdout не нужен            "concurrent_fragment_downloads": 4,            "retries": 3,        }        if progress_hook is not None:            ydl_opts["progress_hooks"] = [progress_hook]        try:            with YoutubeDL(ydl_opts) as ydl:                info = ydl.extract_info(url, download=True)                file_path = Path(ydl.prepare_filename(info))        except DownloadError as e:            logger.warning("yt-dlp error for %s: %s", url, e)            raise        return DownloadResult(            file_path=file_path,            title=info.get("title", "video"),            duration=int(info.get("duration") or 0),            filesize=file_path.stat().st_size,        )    async def shutdown(self) -> None:        self._executor.shutdown(wait=True, cancel_futures=False)

Что здесь важно:

  • run_in_executor с кастомным пулом — мы не трогаем дефолтный executor, он нужен для других операций (to_thread у aiogram и библиотек).

  • max_workers=4 — это наш натуральный rate limit. Пятый пользователь подождёт в очереди, а не положит сервер.

  • DownloadResultfrozen dataclass со slots — потому что мы Senior-разработчики, а не аниматоры гифок.

  • Формат селектор bv*[height<=1080][ext=mp4]+ba[ext=m4a] — заслуживает отдельной статьи, но вкратце: берём лучшее видео mp4 до 1080p + лучшее аудио m4a и мёржим. Telegram любит mp4/H.264/AAC.

Хендлер теперь выглядит прилично:

python

@router.message(F.text.regexp(URL_REGEX))async def handle_url(    message: Message,    downloader: DownloaderService,  # инжектится через workflow_data) -> None:    status = await message.answer("⏳ Принял, качаю...")    try:        result = await downloader.download(message.text)    except DownloadError:        await status.edit_text("❌ Не смог скачать. Проверьте ссылку.")        return    await message.answer_video(        FSInputFile(result.file_path),        caption=result.title[:1024],    )    await status.delete()    result.file_path.unlink(missing_ok=True)

Всё. Event Loop больше не блокируется, бот отвечает всем пользователям параллельно, а тяжёлую работу молча перемалывают 4 рабочих потока.

Но есть одна эстетическая проблема.


Прогресс-бар: как пробросить проценты из потока в корутину

Пользователь не любит смотреть на статичное «качаю…» минуту. Ему хочется видеть 45% → 72% → 89%. И вот тут начинается магия межпоточной коммуникации.

В чём подстава

yt-dlp даёт нам progress_hooksсинхронный коллбэк, который дёргается из рабочего потока на каждом чанке. Выглядит он так:

python

def hook(d: dict) -> None:    if d["status"] == "downloading":        print(d["_percent_str"])  # '  45.3%'

Проблема: из этого коллбэка нельзя напрямую вызвать await message.edit_text(...). Мы в чужом потоке, там нет Event Loop. Попытка сделать asyncio.run() внутри хука — гарантированный способ получить RuntimeError и/или race condition.

Решение: asyncio.run_coroutine_threadsafe + throttling

Нам нужен мост между потоком и циклом. У asyncio есть ровно одна правильная функция для этого — run_coroutine_threadsafe. Она thread-safe планирует корутину в указанный Event Loop и возвращает concurrent.futures.Future.

И сразу второй момент: Telegram люто не любит, когда ему спамят editMessageText. Лимит — примерно одно редактирование в секунду на сообщение. Если вы будете дёргать API каждые 200 миллисекунд, словите 429 Too Many Requests и флуд-бан. Значит, нужен троттлинг.

Класс ProgressReporter

python

from __future__ import annotationsimport asyncioimport timefrom typing import Anyfrom aiogram.types import Messageclass ProgressReporter:    """    Мост между синхронным прогресс-хуком yt-dlp и асинхронным    редактированием сообщения в Telegram.    Ключевые фичи:      * Throttling (не чаще раз в N секунд) — чтобы не словить 429.      * Threadsafe-планирование корутин в основной Event Loop.      * Игнорирование ошибок редактирования (сообщение могло быть удалено).    """    __slots__ = ("_message", "_loop", "_min_interval", "_last_update", "_last_text")    def __init__(        self,        message: Message,        loop: asyncio.AbstractEventLoop,        min_interval: float = 2.0,    ) -> None:        self._message = message        self._loop = loop        self._min_interval = min_interval        self._last_update: float = 0.0        self._last_text: str = ""    def __call__(self, d: dict[str, Any]) -> None:        """Этот метод вызывается yt-dlp из рабочего потока."""        status = d.get("status")        if status == "downloading":            text = self._format_downloading(d)        elif status == "finished":            text = "🔧 Обработка (ffmpeg)..."        else:            return        now = time.monotonic()        if now - self._last_update < self._min_interval:            return        if text == self._last_text:            return        self._last_update = now        self._last_text = text        # Планируем корутину в чужой event loop — thread-safe.        # Результат нас не интересует, ошибки глотаем.        asyncio.run_coroutine_threadsafe(self._safe_edit(text), self._loop)    async def _safe_edit(self, text: str) -> None:        try:            await self._message.edit_text(text)        except Exception:            # TelegramBadRequest (message is not modified),            # TelegramRetryAfter, message deleted и прочее.            # Прогресс-бар не стоит того, чтобы ронять задачу.            pass    @staticmethod    def _format_downloading(d: dict[str, Any]) -> str:        total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0        downloaded = d.get("downloaded_bytes") or 0        speed = d.get("speed") or 0        if not total:            return f"⬇️ Скачано {downloaded / 1024 / 1024:.1f} MB"        percent = downloaded / total * 100        bar_len = 20        filled = int(bar_len * percent / 100)        bar = "█" * filled + "░" * (bar_len - filled)        speed_mb = speed / 1024 / 1024 if speed else 0        return (            f"⬇️ Загрузка\n"            f"<code>[{bar}] {percent:.1f}%</code>\n"            f"Скорость: {speed_mb:.2f} MB/s"        )

Связываем всё вместе

Хендлер принимает окончательный вид:

python

@router.message(F.text.regexp(URL_REGEX))async def handle_url(    message: Message,    downloader: DownloaderService,) -> None:    status_msg = await message.answer("⏳ Подготовка...")    loop = asyncio.get_running_loop()    reporter = ProgressReporter(status_msg, loop, min_interval=2.0)    try:        result = await downloader.download(            message.text,            progress_hook=reporter,        )    except DownloadError:        await status_msg.edit_text("❌ Не удалось скачать. Ссылка битая или контент приватный.")        return    except Exception:        logger.exception("Unexpected error")        await status_msg.edit_text("💥 Внутренняя ошибка. Уже чиню.")        return    try:        await message.answer_video(            FSInputFile(result.file_path),            caption=f"🎬 {result.title}"[:1024],            supports_streaming=True,        )        await status_msg.delete()    finally:        result.file_path.unlink(missing_ok=True)

Что мы получили:

  1. Event Loop свободен — любое количество пользователей качают параллельно (до лимита пула).

  2. Прогресс обновляется плавно — не чаще раза в 2 секунды, что вписывается в лимиты Telegram.

  3. Межпоточная коммуникация корректна — через run_coroutine_threadsafe, а не через хаки с asyncio.new_event_loop() в каждом потоке.

  4. Ошибки не роняют задачу — редактирование обёрнуто в try/except, потому что прогресс-бар — вещь опциональная.


Подводные камни, о которых стоит знать

Быстрый чек-лист, чтобы вы не наступили на грабли после деплоя:

  • Лимит 50 МБ у ботов на sendVideo через обычный Bot API. Решается либо self-hosted Bot API (лимит 2 ГБ), либо предварительной проверкой размера через extract_info(download=False) и отказом на больших файлах.

  • FFmpeg обязан быть в PATH. Без него merge_output_format не работает, и вы получите отдельно .mp4 и .m4a.

  • YouTube периодически требует cookies для возрастных и регионально-ограниченных видео. Опция cookiefile в ydl_opts спасает.

  • Одновременная очистка файлов. Если два юзера прислали один и тот же ролик, а outtmpl завязан на %(id)s, второй воркер может удалить файл раньше, чем первый отправит. Решается либо уникальным префиксом на задачу (UUID в outtmpl), либо файловой блокировкой.

  • Бэкпрешер. ThreadPoolExecutor имеет неограниченную очередь — если ваш бот вдруг попал в топ, сотни задач встанут в неё и съедят память. Оберните в семафор или в asyncio.Queue с ограничением.

  • Shutdown. При остановке бота не забудьте вызвать await downloader.shutdown() через dp.shutdown(), иначе потоки повиснут.


Заключение и Proof of Concept

Ключевые выводы для тех, кто долистал:

  • Любую синхронную библиотеку в async-проекте нужно изолировать в thread pool. yt-dlp, requests, psycopg2, PIL — всё это ядовито для Event Loop.

  • asyncio.run_coroutine_threadsafe — единственный правильный способ гонять корутины из чужого потока. Никаких new_event_loop, никаких asyncio.run внутри коллбэка.

  • Throttling обязателен на любой коммуникации с Telegram API из-под прогресса. Иначе — флуд-бан.

  • Архитектурно разделяйте слои: хендлер не должен знать про yt-dlp, сервис не должен знать про Message. Мы этого касались через ProgressReporter как callable.

Всё описанное у меня крутится на скромной VPS, работает с декабря, пережило апдейты YouTube, апдейты Telegram и несколько моих DDoS-самого-себя-экспериментов. Бот бесплатный, пока без рекламы и без того самого «введите номер для подтверждения».

Можете потыкать вживую, посмотреть, как работает и проверить скорость скачивания — проект живёт тут: @skachaesh_bot.

интерфейс бота для скачивания с ютуб в телеграм

интерфейс бота для скачивания с ютуб в телеграм

Если статья зашла — пишите в комментариях, про что ещё покопать: отдельно про self-hosted Telegram Bot API и обход лимита 50 МБ, про FSM и очереди задач на Redis, или про то, как прикрутить сюда Celery и в какой момент это становится оверинжинирингом.

ссылка на оригинал статьи https://habr.com/ru/articles/1026238/