Зачем в 2026 году писать ещё один загрузчик видео
Казалось бы, тема заезженная до состояния покрышки камаза: «сделай бота, который качает ролики». На Хабре уже лежит десяток туториалов, на GitHub — сотни форков. Но если вы хоть раз открывали такой бот в проде — вы знаете, что 90% из них падают на втором пользователе, а оставшиеся 10% честно умирают на видео длиннее 15 минут.
Я полез в органическую выдачу Google, чтобы понять, почему люди вообще ищут Telegram-ботов, а не пользуются веб-сервисами. И выдача, честно говоря, удручает.
Мини-аналитика поисковой выдачи (или почему люди идут в Telegram)
Когда я из любопытства пошёл смотреть, что на самом деле гуглят живые люди вокруг темы «скачать видео с ютуба», картина сложилась довольно предсказуемая. Базовый запрос — это банальное «скачать видео с YouTube по ссылке», с вариациями раскладки и опечаток уровня «ютуб com». Дальше начинается сегментация по качеству: кому-то обязательно нужно 1080p (причём половина пишет «1080р» кириллицей, даже не замечая этого), а кому-то — голый MP4-файл без лишних обвесов.
Вторая ось — способ получения. Человек хочет бесплатно, онлайн и желательно без регистрации (в 2026-м форма регистрации пугает пользователя сильнее, чем баннер онлайн-казино, и я его понимаю). Именно под этот интент заточены первые десять строк выдачи: снаружи — аккуратный лендинг с большой кнопкой, внутри — три редиректа, попап-блокировщик, инсталлер Yandex.Browser Setup.exe и, если повезёт, бесплатный майнер в подарок. Суть всех этих запросов, как ни крути их семантически, сводится к одному — нажать одну кнопку и получить MP4 в хорошем качестве. Всё остальное — уже наши с вами инженерные проблемы. Потому что пользовоатель не хочет:
-
смотреть рекламу казино «Пин-Ап»;
-
ставить «плагин для браузера»;
-
регистрироваться, подтверждать почту и проходить капчу с пожарными гидрантами;
-
объяснять антивирусу, что
setup_downloader_pro_final2.exe— это якобы легитимно.
Именно поэтому Telegram-бот выигрывает у веб-сервисов как концепт: нет рекламы, нет регистрации, нет странных exe-шников. Прислал ссылку — получил файл. Всё.
Проблема одна: написать такой бот правильно — сложнее, чем кажется. Об этом и статья.
Стек: почему Python + aiogram 3 + yt-dlp
Коротко и по делу, без очередного сравнения с Go и Node.
-
Python 3.11+ — потому что
asyncioв 11-й ветке стал действительно быстрым,TaskGroupприехал, аtomllibвстроен. -
aiogram 3.x — современный роутер,
Dispatcherна DI, FSM из коробки, полноценный Type Hinting, middlewares, фильтры. Это неpython-telegram-botв его классическом перегруженном виде и не telebot образца 2018-го. -
yt-dlp — форк
youtube-dl, который живее всех живых. Поддерживает не только YouTube, но и около 1500 сайтов (TikTok, VK, Instagram, X/Twitter, Vimeo, Rutube и так далее). Парсеры обновляются чаще, чем я коммичу.
Архитектурно задача выглядит тривиально: принял сообщение → выдрал ссылку → вызвал yt-dlp → отправил файл. И вот здесь начинается самое интересное.
Главная проблема: блокирующий IO и смерть Event Loop
Новичок, написавший первого бота на aiogram, делает примерно так:
python
@router.message(F.text.startswith("http"))
async def download_handler(message: Message) -> None:
url = message.text
with YoutubeDL({"outtmpl": "video.mp4"}) as ydl:
ydl.download([url]) # 🔥 тут всё и умирает
await message.answer_video(FSInputFile("video.mp4"))
На одном пользователе это даже работает. На втором — бот превращается в тыкву.
Почему так происходит
asyncio работает в одном потоке — это кооперативная многозадачность. Event Loop жонглирует корутинами, переключаясь между ними на await. Пока одна корутина не встретила await или не завершилась — остальные ждут.
yt-dlp — полностью синхронная библиотека. Под капотом она:
-
Ходит HTTP-запросами через
urllib(блокирующий сокет). -
Парсит страницы, извлекает URL стримов.
-
Скачивает чанки через блокирующие операции IO.
-
Вызывает
ffmpegчерезsubprocess— это тоже блок.
Когда вы вызываете ydl.download([url]) в async def-хендлере, вы буквально замораживаете Event Loop на всё время скачивания. Представьте: пользователь А прислал ссылку на 2-часовой стрим в 4K. На 2 часа ваш бот перестаёт отвечать абсолютно всем — кнопки не нажимаются, /start игнорируется, middlewares молчат. И нет, GIL здесь не поможет: проблема даже не в нём, а в том, что у вас один поток для всей асинхронной машины.
Многие наивно думают: «ну так GIL же всё равно отпускается на IO, проблем быть не должно». Проблема есть — GIL отпускается на уровне интерпретатора CPython, но asyncio про это ничего не знает. Для Event Loop любой синхронный вызов — это чёрный ящик, из которого не возвращается управление.
Вывод: синхронный код в async-хендлере = DoS самого себя.
Архитектурное решение: выносим работу в поток
Решений у проблемы три:
-
asyncio.to_thread()— сахар поверхrun_in_executorс дефолтным пулом. Pythonic, ленив, подходит для 95% случаев. -
Кастомный
ThreadPoolExecutor— когда нужен контроль над количеством воркеров и очередью. -
ProcessPoolExecutor— когда есть реальный CPU-bound (нам не подходит,yt-dlp— это IO-bound).
Я беру вариант №2 — кастомный пул. Потому что если вы пустите всё в дефолтный пул через to_thread, то при 50 параллельных запросах у вас улетит в небеса и память, и сеть. Нужен rate limiting на уровне архитектуры.
Сервисный слой DownloaderService
python
from __future__ import annotations
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable
from yt_dlp import YoutubeDL
from yt_dlp.utils import DownloadError
logger = logging.getLogger(__name__)
@dataclass(slots=True, frozen=True)
class DownloadResult:
file_path: Path
title: str
duration: int
filesize: int
class DownloaderService:
"""
Сервис скачивания видео. Всю блокирующую работу выносит
в отдельный ThreadPoolExecutor, чтобы не мешать Event Loop.
"""
def __init__(
self,
download_dir: Path,
max_workers: int = 4,
) -> None:
self._download_dir = download_dir
self._download_dir.mkdir(parents=True, exist_ok=True)
# Ограничиваем количество одновременных скачиваний.
# Больше 4-8 смысла не имеет: упрёмся в сеть/диск/RAM.
self._executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="ytdlp-worker",
)
async def download(
self,
url: str,
progress_hook: Callable[[dict[str, Any]], None] | None = None,
) -> DownloadResult:
"""
Асинхронная обёртка. Внутри — честный синхронный yt-dlp,
но исполняется в отдельном потоке.
"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self._executor,
self._blocking_download,
url,
progress_hook,
)
def _blocking_download(
self,
url: str,
progress_hook: Callable[[dict[str, Any]], None] | None,
) -> DownloadResult:
"""Этот метод выполняется ВНЕ event loop, в рабочем потоке."""
ydl_opts: dict[str, Any] = {
# Лучшее mp4 до 1080p + лучший m4a, смёрженные в mp4.
"format": "bv*[height<=1080][ext=mp4]+ba[ext=m4a]/b[ext=mp4]/b",
"merge_output_format": "mp4",
"outtmpl": str(self._download_dir / "%(id)s.%(ext)s"),
"quiet": True,
"no_warnings": True,
"noprogress": True, # свой хук, стандартный stdout не нужен
"concurrent_fragment_downloads": 4,
"retries": 3,
}
if progress_hook is not None:
ydl_opts["progress_hooks"] = [progress_hook]
try:
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
file_path = Path(ydl.prepare_filename(info))
except DownloadError as e:
logger.warning("yt-dlp error for %s: %s", url, e)
raise
return DownloadResult(
file_path=file_path,
title=info.get("title", "video"),
duration=int(info.get("duration") or 0),
filesize=file_path.stat().st_size,
)
async def shutdown(self) -> None:
self._executor.shutdown(wait=True, cancel_futures=False)
Что здесь важно:
-
run_in_executorс кастомным пулом — мы не трогаем дефолтный executor, он нужен для других операций (to_threadу aiogram и библиотек). -
max_workers=4— это наш натуральный rate limit. Пятый пользователь подождёт в очереди, а не положит сервер. -
DownloadResult—frozendataclass соslots— потому что мы Senior-разработчики, а не аниматоры гифок. -
Формат селектор
bv*[height<=1080][ext=mp4]+ba[ext=m4a]— заслуживает отдельной статьи, но вкратце: берём лучшее видео mp4 до 1080p + лучшее аудио m4a и мёржим. Telegram любит mp4/H.264/AAC.
Хендлер теперь выглядит прилично:
python
@router.message(F.text.regexp(URL_REGEX))
async def handle_url(
message: Message,
downloader: DownloaderService, # инжектится через workflow_data
) -> None:
status = await message.answer("⏳ Принял, качаю...")
try:
result = await downloader.download(message.text)
except DownloadError:
await status.edit_text("❌ Не смог скачать. Проверьте ссылку.")
return
await message.answer_video(
FSInputFile(result.file_path),
caption=result.title[:1024],
)
await status.delete()
result.file_path.unlink(missing_ok=True)
Всё. Event Loop больше не блокируется, бот отвечает всем пользователям параллельно, а тяжёлую работу молча перемалывают 4 рабочих потока.
Но есть одна эстетическая проблема.
Прогресс-бар: как пробросить проценты из потока в корутину
Пользователь не любит смотреть на статичное «качаю...» минуту. Ему хочется видеть 45% → 72% → 89%. И вот тут начинается магия межпоточной коммуникации.
В чём подстава
yt-dlp даёт нам progress_hooks — синхронный коллбэк, который дёргается из рабочего потока на каждом чанке. Выглядит он так:
python
def hook(d: dict) -> None:
if d["status"] == "downloading":
print(d["_percent_str"]) # ' 45.3%'
Проблема: из этого коллбэка нельзя напрямую вызвать await message.edit_text(...). Мы в чужом потоке, там нет Event Loop. Попытка сделать asyncio.run() внутри хука — гарантированный способ получить RuntimeError и/или race condition.
Решение: asyncio.run_coroutine_threadsafe + throttling
Нам нужен мост между потоком и циклом. У asyncio есть ровно одна правильная функция для этого — run_coroutine_threadsafe. Она thread-safe планирует корутину в указанный Event Loop и возвращает concurrent.futures.Future.
И сразу второй момент: Telegram люто не любит, когда ему спамят editMessageText. Лимит — примерно одно редактирование в секунду на сообщение. Если вы будете дёргать API каждые 200 миллисекунд, словите 429 Too Many Requests и флуд-бан. Значит, нужен троттлинг.
Класс ProgressReporter
python
from __future__ import annotations
import asyncio
import time
from typing import Any
from aiogram.types import Message
class ProgressReporter:
"""
Мост между синхронным прогресс-хуком yt-dlp и асинхронным
редактированием сообщения в Telegram.
Ключевые фичи:
* Throttling (не чаще раз в N секунд) — чтобы не словить 429.
* Threadsafe-планирование корутин в основной Event Loop.
* Игнорирование ошибок редактирования (сообщение могло быть удалено).
"""
__slots__ = ("_message", "_loop", "_min_interval", "_last_update", "_last_text")
def __init__(
self,
message: Message,
loop: asyncio.AbstractEventLoop,
min_interval: float = 2.0,
) -> None:
self._message = message
self._loop = loop
self._min_interval = min_interval
self._last_update: float = 0.0
self._last_text: str = ""
def __call__(self, d: dict[str, Any]) -> None:
"""Этот метод вызывается yt-dlp из рабочего потока."""
status = d.get("status")
if status == "downloading":
text = self._format_downloading(d)
elif status == "finished":
text = "🔧 Обработка (ffmpeg)..."
else:
return
now = time.monotonic()
if now - self._last_update < self._min_interval:
return
if text == self._last_text:
return
self._last_update = now
self._last_text = text
# Планируем корутину в чужой event loop — thread-safe.
# Результат нас не интересует, ошибки глотаем.
asyncio.run_coroutine_threadsafe(self._safe_edit(text), self._loop)
async def _safe_edit(self, text: str) -> None:
try:
await self._message.edit_text(text)
except Exception:
# TelegramBadRequest (message is not modified),
# TelegramRetryAfter, message deleted и прочее.
# Прогресс-бар не стоит того, чтобы ронять задачу.
pass
@staticmethod
def _format_downloading(d: dict[str, Any]) -> str:
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
downloaded = d.get("downloaded_bytes") or 0
speed = d.get("speed") or 0
if not total:
return f"⬇️ Скачано {downloaded / 1024 / 1024:.1f} MB"
percent = downloaded / total * 100
bar_len = 20
filled = int(bar_len * percent / 100)
bar = "█" * filled + "░" * (bar_len - filled)
speed_mb = speed / 1024 / 1024 if speed else 0
return (
f"⬇️ Загрузкаn"
f"<code>[{bar}] {percent:.1f}%</code>n"
f"Скорость: {speed_mb:.2f} MB/s"
)
Связываем всё вместе
Хендлер принимает окончательный вид:
python
@router.message(F.text.regexp(URL_REGEX))
async def handle_url(
message: Message,
downloader: DownloaderService,
) -> None:
status_msg = await message.answer("⏳ Подготовка...")
loop = asyncio.get_running_loop()
reporter = ProgressReporter(status_msg, loop, min_interval=2.0)
try:
result = await downloader.download(
message.text,
progress_hook=reporter,
)
except DownloadError:
await status_msg.edit_text("❌ Не удалось скачать. Ссылка битая или контент приватный.")
return
except Exception:
logger.exception("Unexpected error")
await status_msg.edit_text("💥 Внутренняя ошибка. Уже чиню.")
return
try:
await message.answer_video(
FSInputFile(result.file_path),
caption=f"🎬 {result.title}"[:1024],
supports_streaming=True,
)
await status_msg.delete()
finally:
result.file_path.unlink(missing_ok=True)
Что мы получили:
-
Event Loop свободен — любое количество пользователей качают параллельно (до лимита пула).
-
Прогресс обновляется плавно — не чаще раза в 2 секунды, что вписывается в лимиты Telegram.
-
Межпоточная коммуникация корректна — через
run_coroutine_threadsafe, а не через хаки сasyncio.new_event_loop()в каждом потоке. -
Ошибки не роняют задачу — редактирование обёрнуто в try/except, потому что прогресс-бар — вещь опциональная.
Подводные камни, о которых стоит знать
Быстрый чек-лист, чтобы вы не наступили на грабли после деплоя:
-
Лимит 50 МБ у ботов на
sendVideoчерез обычный Bot API. Решается либо self-hosted Bot API (лимит 2 ГБ), либо предварительной проверкой размера черезextract_info(download=False)и отказом на больших файлах. -
FFmpeg обязан быть в PATH. Без него
merge_output_formatне работает, и вы получите отдельно.mp4и.m4a. -
YouTube периодически требует cookies для возрастных и регионально-ограниченных видео. Опция
cookiefileвydl_optsспасает. -
Одновременная очистка файлов. Если два юзера прислали один и тот же ролик, а
outtmplзавязан на%(id)s, второй воркер может удалить файл раньше, чем первый отправит. Решается либо уникальным префиксом на задачу (UUID вouttmpl), либо файловой блокировкой. -
Бэкпрешер.
ThreadPoolExecutorимеет неограниченную очередь — если ваш бот вдруг попал в топ, сотни задач встанут в неё и съедят память. Оберните в семафор или вasyncio.Queueс ограничением. -
Shutdown. При остановке бота не забудьте вызвать
await downloader.shutdown()черезdp.shutdown(), иначе потоки повиснут.
Заключение и Proof of Concept
Ключевые выводы для тех, кто долистал:
-
Любую синхронную библиотеку в async-проекте нужно изолировать в thread pool.
yt-dlp,requests,psycopg2,PIL— всё это ядовито для Event Loop. -
asyncio.run_coroutine_threadsafe— единственный правильный способ гонять корутины из чужого потока. Никакихnew_event_loop, никакихasyncio.runвнутри коллбэка. -
Throttling обязателен на любой коммуникации с Telegram API из-под прогресса. Иначе — флуд-бан.
-
Архитектурно разделяйте слои: хендлер не должен знать про
yt-dlp, сервис не должен знать проMessage. Мы этого касались черезProgressReporterкак callable.
Всё описанное у меня крутится на скромной , работает с декабря, пережило апдейты YouTube, апдейты Telegram и несколько моих DDoS-самого-себя-экспериментов. Бот бесплатный, пока без рекламы и без того самого «введите номер для подтверждения».
Можете потыкать вживую, посмотреть, как работает и проверить скорость скачивания — проект живёт тут: @skachaesh_bot.
Если статья зашла — пишите в комментариях, про что ещё покопать: отдельно про self-hosted Telegram Bot API и обход лимита 50 МБ, про FSM и очереди задач на Redis, или про то, как прикрутить сюда Celery и в какой момент это становится оверинжинирингом.
Автор: katya_anonim
