На днях мне прилетела задача, в которой нужно было вычерпывать данные по HTTP с такими вводными:
-
Есть ограничение по количеству запросов в минуту
-
Объём данных - миллионы записей
-
Один запрос выполняется долго (возвращает много данных)
-
Нужен асинхронный механизм выгрузки
Не включая , я начал накидывать решение...
Грабли №1: async, который работает синхронно
async def fetch_all_pages():
...
while True:
response = await fetch( # ← ошибка
f"/resource?page={page}"
)
...
page += 1
...
Формально:
-
async
-
await
Фактически:
-
один запрос за раз
-
каждый следующий запрос ждёт предыдущий
Async-код не становится параллельным автоматически. Если каждый запрос ждёт завершения предыдущего - это синхронное исполнение в async-синтаксисе.
Грабли №2: «давайте просто gather всё»
Окей, давайте делать «настоящий async».
async def fetch_all():
tasks = [fetch(page) for page in range(1, 1000)]
results = await asyncio.gather(*tasks) # ← ошибка
...
Работает. Быстро. Очень быстро. Но мы только что создали монстра.
Что пошло не так:
-
сoroutine storm - резкий рост потребления RAM
-
закономерные
429 Too Many Requests, а дальше - блокировки и баны -
gatherждёт все задачи, одна зависла → зависло всё -
невозможно стримить данные (а это критично в моём кейсе)
asyncio.gatherбез ограничений - это не параллелизм, а хаос.
Грабли №3: «давайте думать, подсказывайте»
Интегрируем семафор - классическое решение из тренажёров и собеседований.
from asyncio import Semaphore
semaphore = Semaphore(10)
async def fetch_page(page):
async with semaphore:
return await fetch(f"/resource?page={page}")
async def main():
tasks = [fetch_page(page) for page in range(1, 1000)]
results = await asyncio.gather(*tasks)
...
Интуитивное ожидание: «Ну значит 10 запросов в секунду»
Реальность:
latency API *плавает** (в моём случае от 8 до 30 секунд), из-за этого:
-
лимит используется лишь на ~30%
-
выгрузка растягивается во времени
-
если API внезапно ускорится (кэш, CDN, оптимизация) - получаем гарантированное кратное увеличение rps - привет, бан.
Для наглядности:
-
Долгие ответы:
10 / 8 сек ≈ 75 req/min -
Деградация API:
10 / 30 сек ≈ 20 req/min -
Ускорение API:
10 / 0.5 сек = 1200 req/min
Семафор связывает пропускную способность системы с самым нестабильным параметром - latency внешнего API.
Semaphore регулирует ширину трубы. Но не регулирует поток воды.
Грабли №4: почти работает (но не совсем)
Чтобы RPS не зависел от latency, нужен механизм лимитирования по времени. Я остановился на алгоритме leaky bucket и его практической реализации - пакете aiolimiter.
Leaky bucket - алгоритм, который контролирует скорость обработки запросов, независимо от того, как быстро они обрабатываются.
Аналогия: кран с фиксированным напором - сколько ни лей, вытекает стабильно.
from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(100, 60) # 100 запросов в минуту
semaphore = Semaphore(10) # контроль параллелизма
async def fetch_page(page):
async with semaphore:
await limiter.acquire()
return await fetch(f"/resource?page={page}")
Это действительно решило проблему зависимости от latency.
Но…
-
корутины всё ещё создаются пачкой
-
нет стриминга
-
сложно контролировать ретраи и ошибки
-
память по-прежнему под нагрузкой
Работает - да. Продакшен-реди - нет.
Антиграбли: золотая середина
Я вышел в интрернет с данным вопросом... и нашел несколько интересных подходов с producer/consumer/worker архитектурой, логика следующая:
Очередь задач → воркеры обрабатывают параллельно → AsyncLimiter контролирует скорость → результаты обрабатываются потоком.
from aiolimiter import AsyncLimiter
async def worker(client, queue, results):
while True:
offset = await queue.get()
if offset is None:
break
await limiter.acquire()
response = await client.get(API_URL, params={"offset": offset})
await results.put(response.json()["results"])
queue.task_done()
async def main():
queue = asyncio.Queue()
results = asyncio.Queue()
for offset in range(0, 1000, 50):
await queue.put(offset)
workers = [
asyncio.create_task(worker(...))
for _ in range(5)
]
while ...:
batch = await results.get()
# обработка данных
Что мы получили:
-
Экономия памяти - задачи не создаются одномоментно
-
Стриминг результатов
-
Контроль перегрузки через количество воркеров
-
Гибкая работа с ретраями и ошибками
-
Предсказуемая и стабильная пропускная способность системы
Подводим итоги
Решение не серебрянная пуля, но для моего кейса - устойчивая и продакшен-реди золотая середина.
Автор: maslievilya1
