- PVSM.RU - https://www.pvsm.ru -
На днях мне прилетела задача, в которой нужно было вычерпывать данные по HTTP с такими вводными:
Есть ограничение по количеству запросов в минуту
Объём данных - миллионы записей
Один запрос выполняется долго (возвращает много данных)
Нужен асинхронный механизм выгрузки
Не включая , я начал накидывать решение...
Грабли №1: async, который работает синхронно
async def fetch_all_pages():
...
while True:
response = await fetch( # ← ошибка
f"/resource?page={page}"
)
...
page += 1
...
Формально:
async
await
Фактически:
один запрос за раз
каждый следующий запрос ждёт предыдущий
Async-код не становится параллельным автоматически. Если каждый запрос ждёт завершения предыдущего - это синхронное исполнение в async-синтаксисе.
Грабли №2: «давайте просто gather всё»
Окей, давайте делать «настоящий async».
async def fetch_all():
tasks = [fetch(page) for page in range(1, 1000)]
results = await asyncio.gather(*tasks) # ← ошибка
...
Работает. Быстро. Очень быстро. Но мы только что создали монстра.
Что пошло не так:
сoroutine storm - резкий рост потребления RAM
закономерные 429 Too Many Requests, а дальше - блокировки и баны
gather ждёт все задачи, одна зависла → зависло всё
невозможно стримить данные (а это критично в моём кейсе)
asyncio.gatherбез ограничений - это не параллелизм, а хаос.
Грабли №3: «давайте думать, подсказывайте»
Интегрируем семафор - классическое решение из тренажёров и собеседований.
from asyncio import Semaphore
semaphore = Semaphore(10)
async def fetch_page(page):
async with semaphore:
return await fetch(f"/resource?page={page}")
async def main():
tasks = [fetch_page(page) for page in range(1, 1000)]
results = await asyncio.gather(*tasks)
...
Интуитивное ожидание: «Ну значит 10 запросов в секунду»
Реальность:
latency API *плавает** (в моём случае от 8 до 30 секунд), из-за этого:
лимит используется лишь на ~30%
выгрузка растягивается во времени
если API внезапно ускорится (кэш, CDN, оптимизация) - получаем гарантированное кратное увеличение rps - привет, бан.
Для наглядности:
Долгие ответы: 10 / 8 сек ≈ 75 req/min
Деградация API: 10 / 30 сек ≈ 20 req/min
Ускорение API: 10 / 0.5 сек = 1200 req/min
Семафор связывает пропускную способность системы с самым нестабильным параметром - latency внешнего API.
Semaphore регулирует ширину трубы. Но не регулирует поток воды.
Грабли №4: почти работает (но не совсем)
Чтобы RPS не зависел от latency, нужен механизм лимитирования по времени. Я остановился на алгоритме leaky bucket и его практической реализации - пакете aiolimiter.
Leaky bucket - алгоритм, который контролирует скорость обработки запросов, независимо от того, как быстро они обрабатываются.
Аналогия: кран с фиксированным напором - сколько ни лей, вытекает стабильно.
from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(100, 60) # 100 запросов в минуту
semaphore = Semaphore(10) # контроль параллелизма
async def fetch_page(page):
async with semaphore:
await limiter.acquire()
return await fetch(f"/resource?page={page}")
Это действительно решило проблему зависимости от latency.
Но…
корутины всё ещё создаются пачкой
нет стриминга
сложно контролировать ретраи и ошибки
память по-прежнему под нагрузкой
Работает - да. Продакшен-реди - нет.
Антиграбли: золотая середина
Я вышел в интрернет с данным вопросом... и нашел несколько интересных подходов с producer/consumer/worker архитектурой, логика следующая:
Очередь задач → воркеры обрабатывают параллельно → AsyncLimiter контролирует скорость → результаты обрабатываются потоком.
from aiolimiter import AsyncLimiter
async def worker(client, queue, results):
while True:
offset = await queue.get()
if offset is None:
break
await limiter.acquire()
response = await client.get(API_URL, params={"offset": offset})
await results.put(response.json()["results"])
queue.task_done()
async def main():
queue = asyncio.Queue()
results = asyncio.Queue()
for offset in range(0, 1000, 50):
await queue.put(offset)
workers = [
asyncio.create_task(worker(...))
for _ in range(5)
]
while ...:
batch = await results.get()
# обработка данных
Что мы получили:
Экономия памяти - задачи не создаются одномоментно
Стриминг результатов
Контроль перегрузки через количество воркеров
Гибкая работа с ретраями и ошибками
Предсказуемая и стабильная пропускная способность системы
Подводим итоги
Решение не серебрянная пуля, но для моего кейса - устойчивая и продакшен-реди золотая середина.
Автор: maslievilya1
Источник [2]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/python/440898
Ссылки в тексте:
[1] мозг: http://www.braintools.ru
[2] Источник: https://habr.com/ru/articles/983066/?utm_source=habrahabr&utm_medium=rss&utm_campaign=983066
Нажмите здесь для печати.