Расскажу, как собрал бота для AI-суммаризации Telegram-каналов: архитектура, выбор LLM-провайдера, оптимизация скорости и неочевидные проблемы при деплое на российский .
Проблема
Подписан на 50+ Telegram-каналов. Каждое утро - 200+ непрочитанных сообщений. Читаю 10%, остальное скроллю. Классический information overload.
Идея: пусть LLM читает посты и присылает выжимку. Но не просто "саммари", а двухступенчатый подход:
-
Real Topic - о чём пост на самом деле (1-2 предложения)
-
TL;DR - полная выжимка с ключевыми тезисами
Логика: даже саммари 50 постов - много. А пробежать глазами "о чём это" - секунды.
Архитектура

Почему Telethon, а не Bot API
Bot API не умеет читать сообщения из каналов, на которые бот не добавлен администратором. Telethon работает через MTProto как обычный клиент - может читать любые публичные каналы.
python
from telethon import TelegramClient
client = TelegramClient('session', api_id, api_hash)
async def get_posts(channel_username: str, limit: int = 20):
entity = await client.get_entity(channel_username)
posts = []
async for message in client.iter_messages(entity, limit=limit):
if message.text and len(message.text) > 50:
posts.append({
'id': message.id,
'text': message.text,
'date': message.date,
'url': f'https://t.me/{channel_username}/{message.id}'
})
return posts
Стек
|
Компонент |
Технология |
Почему |
|---|---|---|
|
Bot Framework |
aiogram 3.x |
Async, типизация, активная разработка |
|
Channel Parser |
Telethon |
MTProto, читает любые публичные каналы |
|
LLM |
DeepSeek API |
Работает из РФ, дёшево ($0.14/1M input) |
|
Database |
SQLite + aiosqlite |
Достаточно для MVP, zero config |
|
Scheduler |
APScheduler |
Простой, async-совместимый |
|
Config |
Pydantic Settings |
Валидация, типизация, .env из коробки |
Промпт для анализа постов
python
ANALYZE_POST_PROMPT = """Проанализируй пост из Telegram-канала и верни JSON.
Канал: {channel_name}
Текст поста:
---
{post_content}
---
Верни JSON строго в таком формате:
{{
"real_topic": "1-2 предложения о чём НА САМОМ ДЕЛЕ этот пост",
"tldr": "Краткое изложение в 2-4 предложениях с главными фактами",
"key_insights": ["ключевой инсайт 1", "ключевой инсайт 2"],
"relevance_score": 7,
"content_type": "news"
}}
Где content_type: "news", "opinion", "tutorial", "announcement", "other"
Отвечай ТОЛЬКО валидным JSON без markdown-разметки."""
Ключевые моменты:
-
Явное указание формата вывода (JSON)
-
Примеры полей прямо в промпте
-
Ограничение на markdown - иначе LLM оборачивает JSON в
json
Грабля #1: Groq не работает из России
Изначально выбрал Groq - бесплатный, быстрый (Llama 3.3 70B за ~1 сек). Локально всё работало. После деплоя на Timeweb (российский 403 Forbidden.
Сравнение провайдеров
|
Провайдер |
Скорость |
Доступ из РФ |
Цена (1M input) |
Цена (1M output) |
|---|---|---|---|---|
|
Groq |
~1 сек |
❌ Blocked |
Бесплатно (лимиты) |
Бесплатно |
|
OpenAI |
~2-3 сек |
❌ Blocked |
$2.50 (GPT-4o-mini) |
$10.00 |
|
DeepSeek |
~5-10 сек |
✅ Работает |
$0.14 |
$0.28 |
|
Mistral |
~3-5 сек |
✅ Работает |
$0.25 |
$0.25 |
Выбрал DeepSeek - работает из России, адекватная цена, качество на уровне GPT-4o-mini.
Универсальный клиент
Сделал клиент с поддержкой разных провайдеров через OpenAI-совместимый API:
python
from openai import AsyncOpenAI
class LLMClient:
PROVIDERS = {
"deepseek": {
"base_url": "https://api.deepseek.com",
"default_model": "deepseek-chat"
},
"groq": {
"base_url": "https://api.groq.com/openai/v1",
"default_model": "llama-3.3-70b-versatile"
},
}
def __init__(self, provider: str, api_key: str):
config = self.PROVIDERS[provider]
self.client = AsyncOpenAI(
base_url=config["base_url"],
api_key=api_key
)
self.model = config["default_model"]
async def complete(self, prompt: str, system: str = None) -> str:
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.3
)
return response.choices[0].message.content
Грабля #2: Дайджест генерируется 2 минуты
После перехода на DeepSeek время ответа выросло с ~1 сек (Groq) до ~8-10 сек. При 9 постах последовательная обработка занимала 80+ секунд.
Диагностика
Добавил логирование времени:
python
import time
async def analyze_post(self, content: str, channel: str):
start = time.time()
result = await self.client.complete(...)
elapsed = time.time() - start
logger.info(f"LLM call took {elapsed:.2f}s for {channel}")
return result
Вывод:
LLM call took 5.39s for channel: channel_1
LLM call took 8.57s for channel: channel_1
LLM call took 9.12s for channel: channel_2
...
Total: 86.3s for 9 posts
Решение: параллельная обработка
python
import asyncio
async def analyze_batch(
self,
posts: list[dict],
concurrency: int = 5
) -> list[dict]:
semaphore = asyncio.Semaphore(concurrency)
async def process_one(post: dict):
async with semaphore:
return await self.analyze_post(
content=post["content"],
channel=post["channel"]
)
tasks = [process_one(post) for post in posts]
return await asyncio.gather(*tasks)
Результаты
|
Режим |
Время (9 постов) |
Ускорение |
|---|---|---|
|
Последовательно |
86 сек |
— |
|
concurrency=3 |
28 сек |
3x |
|
concurrency=5 |
19 сек |
4.5x |
Семафор нужен, чтобы не упереться в rate limit API.
Грабля #3: Один канал забивает весь дайджест
Пользователь добавил 3 канала:
-
Канал A: 20 постов/день
-
Канал B: 5 постов/день
-
Канал C: 3 поста/день
При лимите 15 постов на дайджест канал C не попадал вообще - посты собирались в порядке добавления каналов.
Алгоритм равномерного распределения
python
def distribute_posts_evenly(
posts_by_channel: dict[str, list],
max_total: int = 30
) -> list:
"""
Round-robin распределение постов из разных каналов.
Args:
posts_by_channel: {"channel_1": [post1, post2], ...}
max_total: максимум постов в итоговом списке
Returns:
Список постов, равномерно распределённых по каналам
"""
channels = list(posts_by_channel.keys())
if not channels:
return []
fair_share = max_total // len(channels)
result = []
# Фаза 1: берём по fair_share из каждого канала
for channel in channels:
posts = posts_by_channel[channel][:fair_share]
for post in posts:
post['_channel'] = channel
result.extend(posts)
# Фаза 2: добираем оставшиеся слоты
remaining = max_total - len(result)
if remaining > 0:
for channel in channels:
leftover = posts_by_channel[channel][fair_share:]
for post in leftover[:remaining]:
post['_channel'] = channel
result.append(post)
remaining -= 1
if remaining <= 0:
break
if remaining <= 0:
break
return result
Теперь при 3 каналах и лимите 30: каждый получает минимум 10 слотов.
Модель данных
python
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, BigInteger
from sqlalchemy.orm import relationship, declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
telegram_id = Column(BigInteger, unique=True, index=True)
digest_time = Column(String, default="07:00") # HH:MM
timezone = Column(String, default="Europe/Moscow")
is_active = Column(Boolean, default=True)
subscriptions = relationship("UserChannel", back_populates="user")
class Channel(Base):
__tablename__ = "channels"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True, index=True) # без @
title = Column(String, nullable=True)
class UserChannel(Base):
"""Many-to-many: пользователь <-> канал"""
__tablename__ = "user_channels"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"))
channel_id = Column(Integer, ForeignKey("channels.id"))
added_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="subscriptions")
channel = relationship("Channel")
SQLite с WAL-режимом справляется с конкурентными запросами:
python
async with engine.begin() as conn:
await conn.execute(text("PRAGMA journal_mode=WAL"))
Планировщик ежедневных дайджестов
python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = AsyncIOScheduler()
async def schedule_user_digests():
"""Планирует дайджест для каждого пользователя на его время"""
users = await get_active_users()
for user in users:
hour, minute = map(int, user.digest_time.split(':'))
scheduler.add_job(
send_digest,
CronTrigger(hour=hour, minute=minute, timezone=user.timezone),
args=[user.telegram_id],
id=f"digest_{user.telegram_id}",
replace_existing=True
)
async def send_digest(telegram_id: int):
"""Генерирует и отправляет дайджест пользователю"""
user = await get_user(telegram_id)
channels = await get_user_channels(user)
# Собираем посты
all_posts = {}
for channel in channels:
posts = await parser.get_posts(channel.username, hours_back=24)
all_posts[channel.username] = posts
# Равномерно распределяем
distributed = distribute_posts_evenly(all_posts, max_total=30)
# Анализируем через LLM (параллельно)
analyzed = await processor.analyze_batch(distributed, concurrency=5)
# Формируем и отправляем
digest_text = format_digest(analyzed)
await bot.send_message(telegram_id, digest_text)
Деплой
Systemd unit
ini
# /etc/systemd/system/briefka.service
[Unit]
Description=Briefka Telegram Bot
After=network.target
[Service]
Type=simple
User=briefka
WorkingDirectory=/opt/briefka
ExecStart=/opt/briefka/venv/bin/python scripts/run_bot.py
Restart=always
RestartSec=10
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
Бэкап базы (cron)
bash
#!/bin/bash
# /opt/briefka/scripts/backup.sh
BACKUP_DIR="/opt/briefka/backups"
DB_PATH="/opt/briefka/briefka.db"
DATE=$(date +%Y%m%d_%H%M%S)
mkdir -p $BACKUP_DIR
cp $DB_PATH "$BACKUP_DIR/briefka_$DATE.db"
# Удаляем бэкапы старше 7 дней
find $BACKUP_DIR -name "*.db" -mtime +7 -delete
cron
0 3 * * * /opt/briefka/scripts/backup.sh >> /opt/briefka/logs/backup.log 2>&1
Метрики
|
Параметр |
Значение |
|---|---|
|
Время генерации дайджеста (10 постов) |
15-20 сек |
|
Потребление RAM |
~95 MB |
|
Стоимость |
530 ₽/мес |
|
Стоимость LLM (DeepSeek, ~100 дайджестов) |
~$0.10/мес |
Что дальше
-
Персонализация - сохранять фидбек (лайк/дизлайк), обучать ранжирование под интересы пользователя
-
Кэширование - не парсить каналы при каждом запросе, фоновое обновление по cron
-
Rate limiting - защита от абуза (сейчас лимит: 10 каналов на пользователя)
Выводы
-
Telethon > Bot API для чтения каналов. Bot API не умеет читать сообщения из чужих каналов.
-
Региональные ограничения реальны. Groq, OpenAI заблокированы в РФ. DeepSeek работает.
-
Параллельность решает. Простой
asyncio.gather()с семафором дал ускорение в 4.5 раза. -
SQLite достаточно для MVP. С WAL-режимом справляется с конкурентными запросами.
-
MVP можно запустить за 500 ₽/мес. Не нужен дорогой сервер для Telegram-бота.
Бот работает в бета-режиме: t.me/briefka_bot
Буду рад вопросам и фидбеку в комментариях.
Автор: Tezarium
