Зачем в 2026 году писать ещё один загрузчик видео
Казалось бы, тема заезженная до состояния покрышки камаза: «сделай бота, который качает ролики». На Хабре уже лежит десяток туториалов, на GitHub — сотни форков. Но если вы хоть раз открывали такой бот в проде — вы знаете, что 90% из них падают на втором пользователе, а оставшиеся 10% честно умирают на видео длиннее 15 минут.
Я полез в органическую выдачу Google, чтобы понять, почему люди вообще ищут Telegram-ботов, а не пользуются веб-сервисами. И выдача, честно говоря, удручает.
Мини-аналитика поисковой выдачи (или почему люди идут в Telegram)
Когда я из любопытства пошёл смотреть, что на самом деле гуглят живые люди вокруг темы «скачать видео с ютуба», картина сложилась довольно предсказуемая. Базовый запрос — это банальное «скачать видео с YouTube по ссылке», с вариациями раскладки и опечаток уровня «ютуб com». Дальше начинается сегментация по качеству: кому-то обязательно нужно 1080p (причём половина пишет «1080р» кириллицей, даже не замечая этого), а кому-то — голый MP4-файл без лишних обвесов.
Вторая ось — способ получения. Человек хочет бесплатно, онлайн и желательно без регистрации (в 2026-м форма регистрации пугает пользователя сильнее, чем баннер онлайн-казино, и я его понимаю). Именно под этот интент заточены первые десять строк выдачи: снаружи — аккуратный лендинг с большой кнопкой, внутри — три редиректа, попап-блокировщик, инсталлер Yandex.Browser Setup.exe и, если повезёт, бесплатный майнер в подарок. Суть всех этих запросов, как ни крути их семантически, сводится к одному — нажать одну кнопку и получить MP4 в хорошем качестве. Всё остальное — уже наши с вами инженерные проблемы. Потому что пользовоатель не хочет:
-
смотреть рекламу казино «Пин-Ап»;
-
ставить «плагин для браузера»;
-
регистрироваться, подтверждать почту и проходить капчу с пожарными гидрантами;
-
объяснять антивирусу, что
setup_downloader_pro_final2.exe— это якобы легитимно.
Именно поэтому Telegram-бот выигрывает у веб-сервисов как концепт: нет рекламы, нет регистрации, нет странных exe-шников. Прислал ссылку — получил файл. Всё.
Проблема одна: написать такой бот правильно — сложнее, чем кажется. Об этом и статья.
Стек: почему Python + aiogram 3 + yt-dlp
Коротко и по делу, без очередного сравнения с Go и Node.
-
Python 3.11+ — потому что
asyncioв 11-й ветке стал действительно быстрым,TaskGroupприехал, аtomllibвстроен. -
aiogram 3.x — современный роутер,
Dispatcherна DI, FSM из коробки, полноценный Type Hinting, middlewares, фильтры. Это неpython-telegram-botв его классическом перегруженном виде и не telebot образца 2018-го. -
yt-dlp — форк
youtube-dl, который живее всех живых. Поддерживает не только YouTube, но и около 1500 сайтов (TikTok, VK, Instagram, X/Twitter, Vimeo, Rutube и так далее). Парсеры обновляются чаще, чем я коммичу.
Архитектурно задача выглядит тривиально: принял сообщение → выдрал ссылку → вызвал yt-dlp → отправил файл. И вот здесь начинается самое интересное.
Главная проблема: блокирующий IO и смерть Event Loop
Новичок, написавший первого бота на aiogram, делает примерно так:
python
@router.message(F.text.startswith("http"))async def download_handler(message: Message) -> None: url = message.text with YoutubeDL({"outtmpl": "video.mp4"}) as ydl: ydl.download([url]) # 🔥 тут всё и умирает await message.answer_video(FSInputFile("video.mp4"))
На одном пользователе это даже работает. На втором — бот превращается в тыкву.
Почему так происходит
asyncio работает в одном потоке — это кооперативная многозадачность. Event Loop жонглирует корутинами, переключаясь между ними на await. Пока одна корутина не встретила await или не завершилась — остальные ждут.
yt-dlp — полностью синхронная библиотека. Под капотом она:
-
Ходит HTTP-запросами через
urllib(блокирующий сокет). -
Парсит страницы, извлекает URL стримов.
-
Скачивает чанки через блокирующие операции IO.
-
Вызывает
ffmpegчерезsubprocess— это тоже блок.
Когда вы вызываете ydl.download([url]) в async def-хендлере, вы буквально замораживаете Event Loop на всё время скачивания. Представьте: пользователь А прислал ссылку на 2-часовой стрим в 4K. На 2 часа ваш бот перестаёт отвечать абсолютно всем — кнопки не нажимаются, /start игнорируется, middlewares молчат. И нет, GIL здесь не поможет: проблема даже не в нём, а в том, что у вас один поток для всей асинхронной машины.
Многие наивно думают: «ну так GIL же всё равно отпускается на IO, проблем быть не должно». Проблема есть — GIL отпускается на уровне интерпретатора CPython, но asyncio про это ничего не знает. Для Event Loop любой синхронный вызов — это чёрный ящик, из которого не возвращается управление.
Вывод: синхронный код в async-хендлере = DoS самого себя.
Архитектурное решение: выносим работу в поток
Решений у проблемы три:
-
asyncio.to_thread()— сахар поверхrun_in_executorс дефолтным пулом. Pythonic, ленив, подходит для 95% случаев. -
Кастомный
ThreadPoolExecutor— когда нужен контроль над количеством воркеров и очередью. -
ProcessPoolExecutor— когда есть реальный CPU-bound (нам не подходит,yt-dlp— это IO-bound).
Я беру вариант №2 — кастомный пул. Потому что если вы пустите всё в дефолтный пул через to_thread, то при 50 параллельных запросах у вас улетит в небеса и память, и сеть. Нужен rate limiting на уровне архитектуры.
Сервисный слой DownloaderService
python
from __future__ import annotationsimport asyncioimport loggingfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclassfrom pathlib import Pathfrom typing import Any, Callablefrom yt_dlp import YoutubeDLfrom yt_dlp.utils import DownloadErrorlogger = logging.getLogger(__name__)@dataclass(slots=True, frozen=True)class DownloadResult: file_path: Path title: str duration: int filesize: intclass DownloaderService: """ Сервис скачивания видео. Всю блокирующую работу выносит в отдельный ThreadPoolExecutor, чтобы не мешать Event Loop. """ def __init__( self, download_dir: Path, max_workers: int = 4, ) -> None: self._download_dir = download_dir self._download_dir.mkdir(parents=True, exist_ok=True) # Ограничиваем количество одновременных скачиваний. # Больше 4-8 смысла не имеет: упрёмся в сеть/диск/RAM. self._executor = ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="ytdlp-worker", ) async def download( self, url: str, progress_hook: Callable[[dict[str, Any]], None] | None = None, ) -> DownloadResult: """ Асинхронная обёртка. Внутри — честный синхронный yt-dlp, но исполняется в отдельном потоке. """ loop = asyncio.get_running_loop() return await loop.run_in_executor( self._executor, self._blocking_download, url, progress_hook, ) def _blocking_download( self, url: str, progress_hook: Callable[[dict[str, Any]], None] | None, ) -> DownloadResult: """Этот метод выполняется ВНЕ event loop, в рабочем потоке.""" ydl_opts: dict[str, Any] = { # Лучшее mp4 до 1080p + лучший m4a, смёрженные в mp4. "format": "bv*[height<=1080][ext=mp4]+ba[ext=m4a]/b[ext=mp4]/b", "merge_output_format": "mp4", "outtmpl": str(self._download_dir / "%(id)s.%(ext)s"), "quiet": True, "no_warnings": True, "noprogress": True, # свой хук, стандартный stdout не нужен "concurrent_fragment_downloads": 4, "retries": 3, } if progress_hook is not None: ydl_opts["progress_hooks"] = [progress_hook] try: with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) file_path = Path(ydl.prepare_filename(info)) except DownloadError as e: logger.warning("yt-dlp error for %s: %s", url, e) raise return DownloadResult( file_path=file_path, title=info.get("title", "video"), duration=int(info.get("duration") or 0), filesize=file_path.stat().st_size, ) async def shutdown(self) -> None: self._executor.shutdown(wait=True, cancel_futures=False)
Что здесь важно:
-
run_in_executorс кастомным пулом — мы не трогаем дефолтный executor, он нужен для других операций (to_threadу aiogram и библиотек). -
max_workers=4— это наш натуральный rate limit. Пятый пользователь подождёт в очереди, а не положит сервер. -
DownloadResult—frozendataclass соslots— потому что мы Senior-разработчики, а не аниматоры гифок. -
Формат селектор
bv*[height<=1080][ext=mp4]+ba[ext=m4a]— заслуживает отдельной статьи, но вкратце: берём лучшее видео mp4 до 1080p + лучшее аудио m4a и мёржим. Telegram любит mp4/H.264/AAC.
Хендлер теперь выглядит прилично:
python
@router.message(F.text.regexp(URL_REGEX))async def handle_url( message: Message, downloader: DownloaderService, # инжектится через workflow_data) -> None: status = await message.answer("⏳ Принял, качаю...") try: result = await downloader.download(message.text) except DownloadError: await status.edit_text("❌ Не смог скачать. Проверьте ссылку.") return await message.answer_video( FSInputFile(result.file_path), caption=result.title[:1024], ) await status.delete() result.file_path.unlink(missing_ok=True)
Всё. Event Loop больше не блокируется, бот отвечает всем пользователям параллельно, а тяжёлую работу молча перемалывают 4 рабочих потока.
Но есть одна эстетическая проблема.
Прогресс-бар: как пробросить проценты из потока в корутину
Пользователь не любит смотреть на статичное «качаю…» минуту. Ему хочется видеть 45% → 72% → 89%. И вот тут начинается магия межпоточной коммуникации.
В чём подстава
yt-dlp даёт нам progress_hooks — синхронный коллбэк, который дёргается из рабочего потока на каждом чанке. Выглядит он так:
python
def hook(d: dict) -> None: if d["status"] == "downloading": print(d["_percent_str"]) # ' 45.3%'
Проблема: из этого коллбэка нельзя напрямую вызвать await message.edit_text(...). Мы в чужом потоке, там нет Event Loop. Попытка сделать asyncio.run() внутри хука — гарантированный способ получить RuntimeError и/или race condition.
Решение: asyncio.run_coroutine_threadsafe + throttling
Нам нужен мост между потоком и циклом. У asyncio есть ровно одна правильная функция для этого — run_coroutine_threadsafe. Она thread-safe планирует корутину в указанный Event Loop и возвращает concurrent.futures.Future.
И сразу второй момент: Telegram люто не любит, когда ему спамят editMessageText. Лимит — примерно одно редактирование в секунду на сообщение. Если вы будете дёргать API каждые 200 миллисекунд, словите 429 Too Many Requests и флуд-бан. Значит, нужен троттлинг.
Класс ProgressReporter
python
from __future__ import annotationsimport asyncioimport timefrom typing import Anyfrom aiogram.types import Messageclass ProgressReporter: """ Мост между синхронным прогресс-хуком yt-dlp и асинхронным редактированием сообщения в Telegram. Ключевые фичи: * Throttling (не чаще раз в N секунд) — чтобы не словить 429. * Threadsafe-планирование корутин в основной Event Loop. * Игнорирование ошибок редактирования (сообщение могло быть удалено). """ __slots__ = ("_message", "_loop", "_min_interval", "_last_update", "_last_text") def __init__( self, message: Message, loop: asyncio.AbstractEventLoop, min_interval: float = 2.0, ) -> None: self._message = message self._loop = loop self._min_interval = min_interval self._last_update: float = 0.0 self._last_text: str = "" def __call__(self, d: dict[str, Any]) -> None: """Этот метод вызывается yt-dlp из рабочего потока.""" status = d.get("status") if status == "downloading": text = self._format_downloading(d) elif status == "finished": text = "🔧 Обработка (ffmpeg)..." else: return now = time.monotonic() if now - self._last_update < self._min_interval: return if text == self._last_text: return self._last_update = now self._last_text = text # Планируем корутину в чужой event loop — thread-safe. # Результат нас не интересует, ошибки глотаем. asyncio.run_coroutine_threadsafe(self._safe_edit(text), self._loop) async def _safe_edit(self, text: str) -> None: try: await self._message.edit_text(text) except Exception: # TelegramBadRequest (message is not modified), # TelegramRetryAfter, message deleted и прочее. # Прогресс-бар не стоит того, чтобы ронять задачу. pass @staticmethod def _format_downloading(d: dict[str, Any]) -> str: total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 downloaded = d.get("downloaded_bytes") or 0 speed = d.get("speed") or 0 if not total: return f"⬇️ Скачано {downloaded / 1024 / 1024:.1f} MB" percent = downloaded / total * 100 bar_len = 20 filled = int(bar_len * percent / 100) bar = "█" * filled + "░" * (bar_len - filled) speed_mb = speed / 1024 / 1024 if speed else 0 return ( f"⬇️ Загрузка\n" f"<code>[{bar}] {percent:.1f}%</code>\n" f"Скорость: {speed_mb:.2f} MB/s" )
Связываем всё вместе
Хендлер принимает окончательный вид:
python
@router.message(F.text.regexp(URL_REGEX))async def handle_url( message: Message, downloader: DownloaderService,) -> None: status_msg = await message.answer("⏳ Подготовка...") loop = asyncio.get_running_loop() reporter = ProgressReporter(status_msg, loop, min_interval=2.0) try: result = await downloader.download( message.text, progress_hook=reporter, ) except DownloadError: await status_msg.edit_text("❌ Не удалось скачать. Ссылка битая или контент приватный.") return except Exception: logger.exception("Unexpected error") await status_msg.edit_text("💥 Внутренняя ошибка. Уже чиню.") return try: await message.answer_video( FSInputFile(result.file_path), caption=f"🎬 {result.title}"[:1024], supports_streaming=True, ) await status_msg.delete() finally: result.file_path.unlink(missing_ok=True)
Что мы получили:
-
Event Loop свободен — любое количество пользователей качают параллельно (до лимита пула).
-
Прогресс обновляется плавно — не чаще раза в 2 секунды, что вписывается в лимиты Telegram.
-
Межпоточная коммуникация корректна — через
run_coroutine_threadsafe, а не через хаки сasyncio.new_event_loop()в каждом потоке. -
Ошибки не роняют задачу — редактирование обёрнуто в try/except, потому что прогресс-бар — вещь опциональная.
Подводные камни, о которых стоит знать
Быстрый чек-лист, чтобы вы не наступили на грабли после деплоя:
-
Лимит 50 МБ у ботов на
sendVideoчерез обычный Bot API. Решается либо self-hosted Bot API (лимит 2 ГБ), либо предварительной проверкой размера черезextract_info(download=False)и отказом на больших файлах. -
FFmpeg обязан быть в PATH. Без него
merge_output_formatне работает, и вы получите отдельно.mp4и.m4a. -
YouTube периодически требует cookies для возрастных и регионально-ограниченных видео. Опция
cookiefileвydl_optsспасает. -
Одновременная очистка файлов. Если два юзера прислали один и тот же ролик, а
outtmplзавязан на%(id)s, второй воркер может удалить файл раньше, чем первый отправит. Решается либо уникальным префиксом на задачу (UUID вouttmpl), либо файловой блокировкой. -
Бэкпрешер.
ThreadPoolExecutorимеет неограниченную очередь — если ваш бот вдруг попал в топ, сотни задач встанут в неё и съедят память. Оберните в семафор или вasyncio.Queueс ограничением. -
Shutdown. При остановке бота не забудьте вызвать
await downloader.shutdown()черезdp.shutdown(), иначе потоки повиснут.
Заключение и Proof of Concept
Ключевые выводы для тех, кто долистал:
-
Любую синхронную библиотеку в async-проекте нужно изолировать в thread pool.
yt-dlp,requests,psycopg2,PIL— всё это ядовито для Event Loop. -
asyncio.run_coroutine_threadsafe— единственный правильный способ гонять корутины из чужого потока. Никакихnew_event_loop, никакихasyncio.runвнутри коллбэка. -
Throttling обязателен на любой коммуникации с Telegram API из-под прогресса. Иначе — флуд-бан.
-
Архитектурно разделяйте слои: хендлер не должен знать про
yt-dlp, сервис не должен знать проMessage. Мы этого касались черезProgressReporterкак callable.
Всё описанное у меня крутится на скромной VPS, работает с декабря, пережило апдейты YouTube, апдейты Telegram и несколько моих DDoS-самого-себя-экспериментов. Бот бесплатный, пока без рекламы и без того самого «введите номер для подтверждения».
Можете потыкать вживую, посмотреть, как работает и проверить скорость скачивания — проект живёт тут: @skachaesh_bot.
Если статья зашла — пишите в комментариях, про что ещё покопать: отдельно про self-hosted Telegram Bot API и обход лимита 50 МБ, про FSM и очереди задач на Redis, или про то, как прикрутить сюда Celery и в какой момент это становится оверинжинирингом.
ссылка на оригинал статьи https://habr.com/ru/articles/1026238/