- PVSM.RU - https://www.pvsm.ru -
Cитуация: у вас в проекте давно живёт Dishka. Контейнер настроен, пулы, фабрики UoW. И тут вы добавляете LLM-агента (например, на AG2 [1]). А он такой:
«Спасибо, держи
dependencies={'session': session, 'repo': repo}и мою собственную DI-библиотеку. Не благодари».
То есть теперь у вас два параллельных DI-контейнера: один для HTTP-роутов, второй — для тулз агента. Один и тот же сценарий приходится регистрировать дважды.
На каждый /chat-роут — руками собирать словарь зависимостей и передавать его в agent.ask(dependencies={...}). И всё это — рядом с уже существующим Dishka-контейнером, который ровно эту работу делать умеет.
Эта статья — про то, как этого больше не делать. Внутри:
зачем агенту вообще нужен полноценный DI и почему AG2 поставляется со встроенным fast-depends [2];
пакет dishka-ag2 [3]— мост между AG2 и Dishka, который превращает два контейнера в один;
рабочий пример, который я собрал: FastAPI + AG2-агент со SSE-стримом, тулзы ходят в Postgres через типизированный DI;
транзакционная семантика tool calls — что происходит, когда модель вызывает три инструмента параллельно и один из них падает.
Полный код примера: github.com/vvlrff/ag2_ag-ui_example [4] — со всеми миграциями, Docker-сборкой и тестами.
AG2 предлагает прокидывать зависимости в инструменты через словарь — либо на уровне агента, либо на каждый прогон:
# на уровне агента (живёт всю жизнь приложения)
agent = Agent(
tools=[my_tool],
dependencies={"static_dep": something},
)
# на каждый прогон — актуальная сессия летит сюда
await agent.ask("...", dependencies={"db_session": session, "repo": repo})
Достаёшь в тулзе через fast-depends-аннотацию — типизация на месте:
@tool
async def my_tool(session: Annotated[AsyncSession, Inject()]) -> ...:
...
Это работает. И до какого-то момента этого хватает. Что не так начинается дальше:
Дублирование DI. Если в проекте уже есть Dishka — у вас два контейнера. Один и тот же CreateNoteUseCase регистрируется в Dishka (для HTTP-роутов) и где-то рядом собирается руками для dependencies={...}. Две политики жизненных циклов. Любая правка в дереве зависимостей — в двух местах.
Ручная сборка словаря в каждом /chat-роуте. На каждый запрос вы открываете Dishka REQUEST, достаёте оттуда сессию, репо, нужные сервисы, складываете в dict и передаёте в agent.ask(dependencies=...). Это ровно та плумбинг-работа, которую DI-контейнер должен делать сам.
Каждое новое требование (логгер, кеш, http-клиент) дописывается в этот же словарь. Через полгода у вас там 15 ключей, и читать сборку этого словаря — отдельное удовольствие.
Скоуп — на весь прогон, а не на tool call. Сессия, переданная в agent.ask, живёт пока работает прогон. Если LLM дёргает три тулзы подряд — все три сидят на одной сессии и одной транзакции. Нужно «каждый tool call — своя транзакция»? Без dishka-ag2 это придётся писать руками.
Желаемое API — типизированное внедрение, как в FastAPI:
@tool
@inject
async def create_note(
title: str,
body: str = "",
*,
uc: FromDishka[CreateNoteUseCase],
) -> NoteToolResult:
response = await uc.execute(CreateNoteRequest(title=title, body=body))
return NoteToolResult.from_entity(response.note)
LLM в схему инструмента видит только title и body. Сценарий внедряется через DI и живёт ровно столько, сколько работает тулза. Один контейнер на весь проект. Это и есть dishka-ag2.
Чтобы понять, как dishka-ag2 встраивается в AG2, нужно знать одну деталь: внутри AG2 уже работает DI — библиотека fast-depends, автономный механизм работы с зависимостями в духе FastAPI Depends, но без привязки к веб-фреймворку.
fast-depends — это инжектор аргументов, а не контейнер. Разница принципиальная: он умеет распознать Annotated[T, Inject()] в сигнатуре, дёрнуть зарегистрированный провайдер, подставить результат в вызов. Чего у него нет — иерархии скоупов (APP/REQUEST/...) и общего реестра, в котором провайдеры могут зависеть друг от друга и переиспользоваться между разными точками входа.
Заодно — два слова про AG-UI. Это SSE-протокол [5], которым сервер шлёт клиенту события агента в реальном времени:
|
Событие |
Когда |
|
|
границы прогона |
|
|
прогон упал |
|
|
стрим токенов от LLM |
|
|
LLM решил вызвать инструмент |
|
|
результат / завершение вызова |
autogen.beta.ag_ui.AGUIStream— обёртка, которая берёт внутренние события агента и сериализует их в SSE-фреймы. AG-UI поддерживают AG2, LangGraph, Pydantic AI, LlamaIndex, CrewAI и т.д. — так что сам по себе он давно не аргумент «за» какой-то фреймворк. Аргумент здесь — связкаAG2.beta + dishka-ag2.
1. AG2Scope — иерархия скоупов
Семь уровней, но реально вам нужны два:
AG2Scope.APP — синглтоны на жизнь приложения: engine SQLAlchemy, sessionmaker, конфиг, OpenAIConfig.
AG2Scope.REQUEST — открывается на каждый HTTP-запрос (через ASGI-middleware) и на каждый tool call (декоратором @inject). Внутри живут сессия БД, репозитории, сценарии.
2. DishkaAsyncMiddleware — middleware агента
Подключается к Agent. Перехватывает события агента (on_turn, on_tool_execution, on_llm_call) и готовит контекст для будущего открытия скоупа. Сам скоуп REQUEST middleware не открывает — только складывает текущий Context и ToolCallEvent в pending-слот.
3. @inject — декоратор тулзы
Когда AG2 вызывает инструмент, @inject:
Достаёт корневой контейнер из контекста.
Открывает AG2Scope.REQUEST с данными из pending-слота.
Резолвит все аргументы, помеченные FromDishka[T].
Вызывает функцию.
Закрывает скоуп после возврата (или после исключения).
Иными словами, скоуп живёт ровно столько, сколько выполняется тело инструмента. Сессия БД — тоже.
Плюс к этому пакет даёт AG2Provider — провайдер «из коробки», который умеет отдавать ToolCallEvent, Context, ConversationContainer и т.п. в виде зависимостей. Без него типы из самого AG2 в инструмент не внедрятся, поэтому AG2Provider() в default_providers() обязателен.
Жизненный цикл скоупов на одном HTTP-запросе:
HTTP POST /api/chat
│
▼ AG2ContainerMiddleware ── открывает AG2Scope.REQUEST для HTTP
│
▼ chat-route ── берёт agent = app.state.agent (синглтон)
│
▼ AGUIStream(agent).dispatch(run_input)
│ агент решает вызвать инструмент
▼
DishkaAsyncMiddleware.on_tool_execution
│ stash_request_context: сохраняет Context, ToolCallEvent
│ (скоуп НЕ открывает)
▼
@inject create_note(uc: FromDishka[CreateNoteUseCase])
│ открывает AG2Scope.REQUEST со stash-данными
│ резолвит uc через цепочку DI
▼
CreateNoteUseCase → AlchemyNoteRepository → AsyncSession → Postgres
│
▼ После возврата @inject закрывает REQUEST-скоуп
│ (сессия БД отпущена обратно в пул)
▼ ToolResultEvent → AGUIStream → SSE TOOL_CALL_RESULT
Контейнер один. Скоуп REQUEST открывается дважды — на HTTP-запрос и на каждый tool call. Это сделано намеренно: каждый tool call — своя транзакция (см. ниже).
Сборка снизу вверх (только важное)
# domain/entities.py
NoteId = NewType("NoteId", UUID)
@dataclass
class Note:
id: NoteId
title: str
body: str
created_at: datetime
# gateways/db/note/interface.py
class NoteRepository(Protocol):
async def create(self, note: Note) -> Note: ...
async def list_notes(self, limit: int = 20) -> list[Note]: ...
async def delete(self, note_id: NoteId) -> bool: ...
Реализация на SQLAlchemy 2.0 — полный код в репозитории [4]. Транзакция в репозитории явно не закрывается — этим занимается UoW.
Сценарий + UnitOfWork
# usecases/uow.py
class UnitOfWork(Protocol):
async def commit(self) -> None: ...
async def rollback(self) -> None: ...
async def flush(self) -> None: ...
AsyncSession структурно удовлетворяет этому протоколу — отдельный класс не нужен. Это пригодится через шаг.
class CreateNoteUseCase:
def __init__(self, repo: NoteRepository, uow: UnitOfWork) -> None:
self._repo = repo
self._uow = uow
async def execute(self, request: CreateNoteRequest) -> CreateNoteResponse:
note = Note(
id=NoteId(uuid4()),
title=request.title,
body=request.body,
created_at=datetime.now(UTC)
)
created = await self._repo.create(note)
await self._uow.commit()
return CreateNoteResponse(note=created)
Сценарий не знает ни про FastAPI, ни про SQLAlchemy, ни про AG2 — только два протокола. Тестируется в полной изоляции.
DI-провайдер с одним хитрым приёмом
class DatabaseProvider(Provider):
@provide(scope=AG2Scope.APP)
async def provide_async_engine(self, settings: Settings) -> AsyncIterator[AsyncEngine]:
engine = create_async_engine(
make_url(settings.database_url),
pool_size=10,
max_overflow=10,
pool_timeout=10,
pool_pre_ping=True,
)
yield engine
await engine.dispose()
@provide(scope=AG2Scope.APP)
def provide_sessionmaker(self, engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
return async_sessionmaker[AsyncSession](
bind=engine,
autoflush=False,
autocommit=False,
expire_on_commit=False,
)
@provide(scope=AG2Scope.REQUEST)
async def provide_async_session(
self, pool: async_sessionmaker[AsyncSession]
) -> AsyncIterator[AnyOf[AsyncSession, UnitOfWork]]: # ← ключевая строчка
async with pool() as session:
yield session
AnyOf[AsyncSession, UnitOfWork] — Dishka регистрирует один и тот же объект под двумя интерфейсами. Репозиторий получит его как AsyncSession, сценарий — как UnitOfWork. Это одна сессия, поэтому await uow.commit() коммитит транзакцию, в которой репозиторий сделал add и flush.
Сборка контейнера
def default_providers() -> tuple[Provider, ...]:
return (
SettingsProvider(),
DatabaseProvider(),
RepositoryProvider(),
UseCaseProvider(),
AgentProvider(),
AG2Provider(), # обязателен
)
def create_container(*providers, context=None) -> AsyncContainer:
return make_async_container(
*providers,
context=context,
scopes=AG2Scope, # переключатель на иерархию dishka-ag2
)
scopes=AG2Scope вместо стандартной Scope.{APP,REQUEST,...} Dishka — иначе AG2Scope.REQUEST не подцепится.
Инструмент: @tool
from autogen.beta import Toolkit, tool
from dishka_ag2 import FromDishka, inject
@tool
@inject
async def create_note(
uc: FromDishka[CreateNoteUseCase],
title: Annotated[str, Field(description="Short title of the note.")],
body: Annotated[str, Field(description="Full text body (optional).")] = "",
) -> dict[str, str]:
"""Create a new note in the database."""
response = await uc.execute(CreateNoteRequest(title=title, body=body))
return NoteToolResult.from_entity(response.note)
Три детали, на которых легко споткнуться:
Порядок декораторов: сначала @tool, потом @inject.
@injectпринимает функцию и возвращает функцию.@toolпринимает функцию и возвращает объектFunctionTool— именно его ждётAgent(tools=[...]). В правильном порядке@injectсрабатывает первым (внутренний), потом@toolпревращает результат вFunctionTool. Если поменять местами —@toolсначала вернётFunctionTool,@injectобернёт его как обычный callable, и AG2 такой результат как инструмент не примет.
FromDishka[...] скрыт от LLM.
Внутри AG2 для сборки JSON Schema инструмента используется механизм
fast-depends, который умеет отличать «зависимости» от «бизнесовых параметров».@injectпомечаетFromDishka[T]как зависимость — в схему она не попадает. LLM видит толькоtitleиbody.
Docstring и типы — это контракт инструмента для LLM.
То, что в
Args:, попадает в описание параметров. Чем точнее формулировка — тем чаще модель попадает в нужный набор аргументов.
Сборка агента
def build_agent(config: OpenAIConfig, container: AsyncContainer) -> Agent:
return Agent(
name="example_assistant",
prompt=SYSTEM_PROMPT,
config=config,
tools=[weather, notes_toolkit()],
middleware=[Middleware(DishkaAsyncMiddleware, container=container)],
)
Агент конструируется один раз при старте и кладётся в app.state.agent. На каждый HTTP-запрос его не пересобирать. Это работает, потому что REQUEST-скоуп открывается на каждый tool call декоратором @inject, а не агентом.
ASGI-middleware для REST-роутов
Стандартная FastAPI-интеграция Dishka не подходит из коробки — её хук на Scope.REQUEST рассчитан на стандартную иерархию, а у нас своя. Поэтому пишем ASGI-middleware на 12 строк:
class AG2ContainerMiddleware:
def __init__(self, app: ASGIApp, container: AsyncContainer) -> None:
self.app = app
self.container = container
async def __call__(self, scope, receive, send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive, send)
async with self.container(
context={Request: request}, scope=AG2Scope.REQUEST,
) as request_container:
request.state.dishka_container = request_container
await self.app(scope, receive, send)
И эндпоинт /api/chat без единого упоминания контейнера:
@router.post("")
async def run_agent(
run_input: Annotated[RunAgentInput, Body()],
request: Request,
accept: Annotated[str | None, Header()] = None,
) -> StreamingResponse:
agent: Agent = request.app.state.agent
stream = AGUIStream(agent)
return StreamingResponse(
stream.dispatch(run_input, accept=accept),
media_type=accept or "text/event-stream",
)
Главная мысль: каждый tool call выполняется в собственной транзакции. Из этого вытекает несколько следствий, которые стоит проговорить явно.
Видимость данных между tool calls.
LLM может за один прогон вызвать
create_note, потомlist_notes. Это разные tool calls с разными сессиями. К моменту, когдаlist_notesначнёт работать, его сессия видит всё, что закоммитила сессияcreate_note. Поведение естественное и предсказуемое.
Параллельные tool calls.
OpenAI и Anthropic могут вернуть несколько tool calls в одном ответе. AG2.beta отдаёт их параллельно. Каждый — в своём
REQUEST-скоупе, со своей сессией из пула. Поэтомуpool_size=10, max_overflow=10— не наугад: это запас на N параллельных тулз. Если у вас агенты на 20 параллельных инструментов — увеличивайте пул.
Если один tool call упал — откатывается только его транзакция.
Если
create_noteуже закоммитился, аdelete_external_thingупал — заметка останется. Общей транзакции на весь прогон нет.
Один сценарий — это один UseCase, а не несколько тулз
Это главная архитектурная мысль статьи. Когда у бизнес-действия есть несколько эффектов — записать в БД, отправить уведомление, опубликовать событие в брокер — всё это входит в один сценарий. Не в десять тулз, и не в одну тулзу с явным session.begin().
Тулза остаётся обёрткой, вся оркестрация — внутри сценария, например:
class CreateNoteUseCase:
def __init__(
self,
notes: NoteRepository,
events: EventBus,
notifier: NotificationService,
uow: UnitOfWork
) -> None:
self._notes = notes
self._events = events
self._notifier = notifier
self._uow = uow
async def execute(self, request: CreateNoteRequest) -> CreateNoteResponse:
try:
note = await self._notes.create(Note(...))
await self._events.publish(NoteCreated(note_id=note.id))
await self._notifier.send(...)
await self._uow.commit()
return CreateNoteResponse(note=note)
except Exception:
await self._uow.rollback()
raise
raise в конце — обязательный. Если «съесть» исключение, модель решит, что операция прошла, и пойдёт дальше с неверным состоянием.
Антипаттерн, чтобы было нагляднее
# ❌ так не надо
@tool
@inject
async def create_note(
session: FromDishka[AsyncSession], # деталь инфраструктуры в API тулзы
notes: FromDishka[NoteRepository], # скоп оркестрации в тулзе
events: FromDishka[EventBus], # туда же
title: str,
body: str = "",
) -> dict[str, str]: ...
Тулза знает про БД, репозиторий, события — три слоя в одном месте. Любое изменение бизнес-логики потащит правки сюда. Правильно — оставить тулзе одну зависимость на сценарий и положить всю сложность за его границу.
Когда выбирать эту связку
AG-UI как протокол поддерживается широким набором фреймворков — LangGraph, Pydantic AI, LlamaIndex, CrewAI, Mastra и др. Сам AG-UI давно не аргумент «за» один конкретный фреймворк.
Что осознанно выбирается в AG2.beta + dishka-ag2:
Готовый мост к Dishka. Если в проекте уже Dishka — dishka-ag2 решает это сразу, без второго параллельного DI и без dependencies={...}-словарей.
Multi-agent-оркестрация. Историческая ниша AG2: групповые чаты, hand-off, паттерны «менеджер — исполнители». Если архитектура одного агента перестаёт хватать — растягивается без смены фреймворка.
AG2.beta + dishka-ag2 — рабочая связка для ИИ-агента с нормальной архитектурой: один DI-контейнер на проект, типизированное внедрение в тулзы, предсказуемая транзакционная семантика и ноль магических словарей в коде агента.
Автор: vvlrff
Источник [6]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/di/451454
Ссылки в тексте:
[1] AG2: https://github.com/ag2ai/ag2
[2] fast-depends: https://github.com/Lancetnik/FastDepends
[3] dishka-ag2: https://github.com/C3EQUALZz/dishka-ag2
[4] github.com/vvlrff/ag2_ag-ui_example: https://github.com/vvlrff/ag2_ag-ui_example
[5] SSE-протокол: https://github.com/ag-ui-protocol/ag-ui
[6] Источник: https://habr.com/ru/articles/1034102/?utm_source=habrahabr&utm_medium=rss&utm_campaign=1034102
Нажмите здесь для печати.