Как мы научили AG2 дружить с нормальным DI (и почему это вообще нужно)

в 8:15, , рубрики: ag-ui, ag2, ai agent, autogen, di, dishka

Cитуация: у вас в проекте давно живёт Dishka. Контейнер настроен, пулы, фабрики UoW. И тут вы добавляете LLM-агента (например, на AG2). А он такой:

«Спасибо, держи dependencies={'session': session, 'repo': repo} и мою собственную DI-библиотеку. Не благодари».

То есть теперь у вас два параллельных DI-контейнера: один для HTTP-роутов, второй — для тулз агента. Один и тот же сценарий приходится регистрировать дважды.
На каждый /chat-роут — руками собирать словарь зависимостей и передавать его в agent.ask(dependencies={...}). И всё это — рядом с уже существующим Dishka-контейнером, который ровно эту работу делать умеет.

Эта статья — про то, как этого больше не делать. Внутри:

  • зачем агенту вообще нужен полноценный DI и почему AG2 поставляется со встроенным fast-depends;

  • пакет dishka-ag2— мост между AG2 и Dishka, который превращает два контейнера в один;

  • рабочий пример, который я собрал: FastAPI + AG2-агент со SSE-стримом, тулзы ходят в Postgres через типизированный DI;

  • транзакционная семантика tool calls — что происходит, когда модель вызывает три инструмента параллельно и один из них падает.

Полный код примера: github.com/vvlrff/ag2_ag-ui_example — со всеми миграциями, Docker-сборкой и тестами.

Проблема: почему dependencies={...} — это боль

AG2 предлагает прокидывать зависимости в инструменты через словарь — либо на уровне агента, либо на каждый прогон:

# на уровне агента (живёт всю жизнь приложения)
agent = Agent(
    tools=[my_tool],
    dependencies={"static_dep": something},
)


# на каждый прогон — актуальная сессия летит сюда
await agent.ask("...", dependencies={"db_session": session, "repo": repo})

Достаёшь в тулзе через fast-depends-аннотацию — типизация на месте:

@tool
async def my_tool(session: Annotated[AsyncSession, Inject()]) -> ...:
    ...

Это работает. И до какого-то момента этого хватает. Что не так начинается дальше:

  1. Дублирование DI. Если в проекте уже есть Dishka — у вас два контейнера. Один и тот же CreateNoteUseCase регистрируется в Dishka (для HTTP-роутов) и где-то рядом собирается руками для dependencies={...}. Две политики жизненных циклов. Любая правка в дереве зависимостей — в двух местах.

  2. Ручная сборка словаря в каждом /chat-роуте. На каждый запрос вы открываете Dishka REQUEST, достаёте оттуда сессию, репо, нужные сервисы, складываете в dict и передаёте в agent.ask(dependencies=...). Это ровно та плумбинг-работа, которую DI-контейнер должен делать сам.

  3. Каждое новое требование (логгер, кеш, http-клиент) дописывается в этот же словарь. Через полгода у вас там 15 ключей, и читать сборку этого словаря — отдельное удовольствие.

  4. Скоуп — на весь прогон, а не на tool call. Сессия, переданная в agent.ask, живёт пока работает прогон. Если LLM дёргает три тулзы подряд — все три сидят на одной сессии и одной транзакции. Нужно «каждый tool call — своя транзакция»? Без dishka-ag2 это придётся писать руками.

Желаемое API — типизированное внедрение, как в FastAPI:

@tool
@inject
async def create_note(
    title: str,
    body: str = "",
    *,
    uc: FromDishka[CreateNoteUseCase],
) -> NoteToolResult:
    response = await uc.execute(CreateNoteRequest(title=title, body=body))
    return NoteToolResult.from_entity(response.note)

LLM в схему инструмента видит только title и body. Сценарий внедряется через DI и живёт ровно столько, сколько работает тулза. Один контейнер на весь проект. Это и есть dishka-ag2.

Под капотом AG2: зачем там fast-depends

Чтобы понять, как dishka-ag2 встраивается в AG2, нужно знать одну деталь: внутри AG2 уже работает DI — библиотека fast-depends, автономный механизм работы с зависимостями в духе FastAPI Depends, но без привязки к веб-фреймворку.

fast-depends — это инжектор аргументов, а не контейнер. Разница принципиальная: он умеет распознать Annotated[T, Inject()] в сигнатуре, дёрнуть зарегистрированный провайдер, подставить результат в вызов. Чего у него нет — иерархии скоупов (APP/REQUEST/...) и общего реестра, в котором провайдеры могут зависеть друг от друга и переиспользоваться между разными точками входа.

Заодно — два слова про AG-UI. Это SSE-протокол, которым сервер шлёт клиенту события агента в реальном времени:

Событие

Когда

RUN_STARTED / RUN_FINISHED

границы прогона

RUN_ERROR

прогон упал

TEXT_MESSAGE_CHUNK

стрим токенов от LLM

TOOL_CALL_START / TOOL_CALL_ARGS

LLM решил вызвать инструмент

TOOL_CALL_RESULT / TOOL_CALL_END

результат / завершение вызова

autogen.beta.ag_ui.AGUIStream — обёртка, которая берёт внутренние события агента и сериализует их в SSE-фреймы. AG-UI поддерживают AG2, LangGraph, Pydantic AI, LlamaIndex, CrewAI и т.д. — так что сам по себе он давно не аргумент «за» какой-то фреймворк. Аргумент здесь — связка AG2.beta + dishka-ag2.

Пакет dishka-ag2: три кирпичика

1. AG2Scope — иерархия скоупов

Семь уровней, но реально вам нужны два:

  • AG2Scope.APP — синглтоны на жизнь приложения: engine SQLAlchemy, sessionmaker, конфиг, OpenAIConfig.

  • AG2Scope.REQUEST — открывается на каждый HTTP-запрос (через ASGI-middleware) и на каждый tool call (декоратором @inject). Внутри живут сессия БД, репозитории, сценарии.

2. DishkaAsyncMiddleware — middleware агента

Подключается к Agent. Перехватывает события агента (on_turn, on_tool_execution, on_llm_call) и готовит контекст для будущего открытия скоупа. Сам скоуп REQUEST middleware не открывает — только складывает текущий Context и ToolCallEvent в pending-слот.

3. @inject — декоратор тулзы

Когда AG2 вызывает инструмент, @inject:

  1. Достаёт корневой контейнер из контекста.

  2. Открывает AG2Scope.REQUEST с данными из pending-слота.

  3. Резолвит все аргументы, помеченные FromDishka[T].

  4. Вызывает функцию.

  5. Закрывает скоуп после возврата (или после исключения).

Иными словами, скоуп живёт ровно столько, сколько выполняется тело инструмента. Сессия БД — тоже.

Плюс к этому пакет даёт AG2Provider — провайдер «из коробки», который умеет отдавать ToolCallEvent, Context, ConversationContainer и т.п. в виде зависимостей. Без него типы из самого AG2 в инструмент не внедрятся, поэтому AG2Provider() в default_providers() обязателен.

Архитектура примера

Жизненный цикл скоупов на одном HTTP-запросе:

HTTP POST /api/chat
   │
   ▼ AG2ContainerMiddleware ── открывает AG2Scope.REQUEST для HTTP
   │
   ▼ chat-route ── берёт agent = app.state.agent (синглтон)
   │
   ▼ AGUIStream(agent).dispatch(run_input)
   │      агент решает вызвать инструмент
   ▼
   DishkaAsyncMiddleware.on_tool_execution
   │      stash_request_context: сохраняет Context, ToolCallEvent
   │      (скоуп НЕ открывает)
   ▼
   @inject create_note(uc: FromDishka[CreateNoteUseCase])
   │      открывает AG2Scope.REQUEST со stash-данными
   │      резолвит uc через цепочку DI
   ▼
   CreateNoteUseCase → AlchemyNoteRepository → AsyncSession → Postgres
   │
   ▼ После возврата @inject закрывает REQUEST-скоуп
   │      (сессия БД отпущена обратно в пул)
   ▼ ToolResultEvent → AGUIStream → SSE TOOL_CALL_RESULT

Контейнер один. Скоуп REQUEST открывается дважды — на HTTP-запрос и на каждый tool call. Это сделано намеренно: каждый tool call — своя транзакция (см. ниже).

Сборка снизу вверх (только важное)

# domain/entities.py
NoteId = NewType("NoteId", UUID)

@dataclass
class Note:
    id: NoteId
    title: str
    body: str
    created_at: datetime

    
# gateways/db/note/interface.py
class NoteRepository(Protocol):
    async def create(self, note: Note) -> Note: ...
    async def list_notes(self, limit: int = 20) -> list[Note]: ...
    async def delete(self, note_id: NoteId) -> bool: ...

Реализация на SQLAlchemy 2.0 — полный код в репозитории. Транзакция в репозитории явно не закрывается — этим занимается UoW.

Сценарий + UnitOfWork

# usecases/uow.py
class UnitOfWork(Protocol):
    async def commit(self) -> None: ...
    async def rollback(self) -> None: ...
    async def flush(self) -> None: ...

AsyncSession структурно удовлетворяет этому протоколу — отдельный класс не нужен. Это пригодится через шаг.

class CreateNoteUseCase:
    def __init__(self, repo: NoteRepository, uow: UnitOfWork) -> None:
        self._repo = repo
        self._uow = uow

    async def execute(self, request: CreateNoteRequest) -> CreateNoteResponse:
        note = Note(
            id=NoteId(uuid4()), 
            title=request.title,
            body=request.body, 
            created_at=datetime.now(UTC)
        )
        created = await self._repo.create(note)
        await self._uow.commit()
        return CreateNoteResponse(note=created)

Сценарий не знает ни про FastAPI, ни про SQLAlchemy, ни про AG2 — только два протокола. Тестируется в полной изоляции.

DI-провайдер с одним хитрым приёмом

class DatabaseProvider(Provider):
    @provide(scope=AG2Scope.APP)
    async def provide_async_engine(self, settings: Settings) -> AsyncIterator[AsyncEngine]:
        engine = create_async_engine(
            make_url(settings.database_url),
            pool_size=10,
            max_overflow=10,
            pool_timeout=10,
            pool_pre_ping=True,
        )
        yield engine
        await engine.dispose()

    @provide(scope=AG2Scope.APP)
    def provide_sessionmaker(self, engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
        return async_sessionmaker[AsyncSession](
            bind=engine,
            autoflush=False,
            autocommit=False,
            expire_on_commit=False,
        )

    @provide(scope=AG2Scope.REQUEST)
    async def provide_async_session(
        self, pool: async_sessionmaker[AsyncSession]
    ) -> AsyncIterator[AnyOf[AsyncSession, UnitOfWork]]:  # ← ключевая строчка
        async with pool() as session:
            yield session

AnyOf[AsyncSession, UnitOfWork] — Dishka регистрирует один и тот же объект под двумя интерфейсами. Репозиторий получит его как AsyncSession, сценарий — как UnitOfWork. Это одна сессия, поэтому await uow.commit() коммитит транзакцию, в которой репозиторий сделал add и flush.

Сборка контейнера

def default_providers() -> tuple[Provider, ...]:
    return (
        SettingsProvider(),
        DatabaseProvider(),
        RepositoryProvider(),
        UseCaseProvider(),
        AgentProvider(),
        AG2Provider(),  # обязателен
    )

def create_container(*providers, context=None) -> AsyncContainer:
    return make_async_container(
        *providers,
        context=context,
        scopes=AG2Scope,  # переключатель на иерархию dishka-ag2
    )

scopes=AG2Scope вместо стандартной Scope.{APP,REQUEST,...} Dishka — иначе AG2Scope.REQUEST не подцепится.

Инструмент: @tool

from autogen.beta import Toolkit, tool
from dishka_ag2 import FromDishka, inject

@tool
@inject
async def create_note(
    uc: FromDishka[CreateNoteUseCase],
    title: Annotated[str, Field(description="Short title of the note.")],
    body: Annotated[str, Field(description="Full text body (optional).")] = "",
) -> dict[str, str]:
    """Create a new note in the database."""
    response = await uc.execute(CreateNoteRequest(title=title, body=body))
    return NoteToolResult.from_entity(response.note)

Три детали, на которых легко споткнуться:

  • Порядок декораторов: сначала @tool, потом @inject.

@inject принимает функцию и возвращает функцию. @tool принимает функцию и возвращает объект FunctionTool — именно его ждёт Agent(tools=[...]). В правильном порядке @inject срабатывает первым (внутренний), потом @tool превращает результат в FunctionTool. Если поменять местами — @tool сначала вернёт FunctionTool, @inject обернёт его как обычный callable, и AG2 такой результат как инструмент не примет.

  • FromDishka[...] скрыт от LLM.

Внутри AG2 для сборки JSON Schema инструмента используется механизм fast-depends, который умеет отличать «зависимости» от «бизнесовых параметров». @inject помечает FromDishka[T] как зависимость — в схему она не попадает. LLM видит только title и body.

  • Docstring и типы — это контракт инструмента для LLM.

То, что в Args:, попадает в описание параметров. Чем точнее формулировка — тем чаще модель попадает в нужный набор аргументов.

Сборка агента

def build_agent(config: OpenAIConfig, container: AsyncContainer) -> Agent:
    return Agent(
        name="example_assistant",
        prompt=SYSTEM_PROMPT,
        config=config,
        tools=[weather, notes_toolkit()],
        middleware=[Middleware(DishkaAsyncMiddleware, container=container)],
    )

Агент конструируется один раз при старте и кладётся в app.state.agent. На каждый HTTP-запрос его не пересобирать. Это работает, потому что REQUEST-скоуп открывается на каждый tool call декоратором @inject, а не агентом.

ASGI-middleware для REST-роутов

Стандартная FastAPI-интеграция Dishka не подходит из коробки — её хук на Scope.REQUEST рассчитан на стандартную иерархию, а у нас своя. Поэтому пишем ASGI-middleware на 12 строк:

class AG2ContainerMiddleware:
    def __init__(self, app: ASGIApp, container: AsyncContainer) -> None:
        self.app = app
        self.container = container

    async def __call__(self, scope, receive, send) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        request = Request(scope, receive, send)
        async with self.container(
            context={Request: request}, scope=AG2Scope.REQUEST,
        ) as request_container:
            request.state.dishka_container = request_container
            await self.app(scope, receive, send)

И эндпоинт /api/chat без единого упоминания контейнера:

@router.post("")
async def run_agent(
    run_input: Annotated[RunAgentInput, Body()],
    request: Request,
    accept: Annotated[str | None, Header()] = None,
) -> StreamingResponse:
    agent: Agent = request.app.state.agent
    stream = AGUIStream(agent)
    return StreamingResponse(
        stream.dispatch(run_input, accept=accept),
        media_type=accept or "text/event-stream",
    )

Транзакционная семантика tool calls

Главная мысль: каждый tool call выполняется в собственной транзакции. Из этого вытекает несколько следствий, которые стоит проговорить явно.

Видимость данных между tool calls.

LLM может за один прогон вызвать create_note, потом list_notes. Это разные tool calls с разными сессиями. К моменту, когда list_notes начнёт работать, его сессия видит всё, что закоммитила сессия create_note. Поведение естественное и предсказуемое.

Параллельные tool calls.

OpenAI и Anthropic могут вернуть несколько tool calls в одном ответе. AG2.beta отдаёт их параллельно. Каждый — в своём REQUEST-скоупе, со своей сессией из пула. Поэтому pool_size=10, max_overflow=10 — не наугад: это запас на N параллельных тулз. Если у вас агенты на 20 параллельных инструментов — увеличивайте пул.

Если один tool call упал — откатывается только его транзакция.

Если create_note уже закоммитился, а delete_external_thing упал — заметка останется. Общей транзакции на весь прогон нет.

Один сценарий — это один UseCase, а не несколько тулз

Это главная архитектурная мысль статьи. Когда у бизнес-действия есть несколько эффектов — записать в БД, отправить уведомление, опубликовать событие в брокер — всё это входит в один сценарий. Не в десять тулз, и не в одну тулзу с явным session.begin().

Тулза остаётся обёрткой, вся оркестрация — внутри сценария, например:

class CreateNoteUseCase:
    def __init__(
        self, 
        notes: NoteRepository, 
        events: EventBus,
        notifier: NotificationService, 
        uow: UnitOfWork
    ) -> None:
        self._notes = notes
        self._events = events
        self._notifier = notifier
        self._uow = uow

    async def execute(self, request: CreateNoteRequest) -> CreateNoteResponse:
        try:
            note = await self._notes.create(Note(...))
            await self._events.publish(NoteCreated(note_id=note.id))
            await self._notifier.send(...)
            await self._uow.commit()
            return CreateNoteResponse(note=note)
        except Exception:
            await self._uow.rollback()
            raise

raise в конце — обязательный. Если «съесть» исключение, модель решит, что операция прошла, и пойдёт дальше с неверным состоянием.

Антипаттерн, чтобы было нагляднее

# ❌ так не надо
@tool
@inject
async def create_note(
    session: FromDishka[AsyncSession],         # деталь инфраструктуры в API тулзы
    notes: FromDishka[NoteRepository],         # скоп оркестрации в тулзе
    events: FromDishka[EventBus],              # туда же
    title: str, 
    body: str = "",
) -> dict[str, str]: ...

Тулза знает про БД, репозиторий, события — три слоя в одном месте. Любое изменение бизнес-логики потащит правки сюда. Правильно — оставить тулзе одну зависимость на сценарий и положить всю сложность за его границу.

Когда выбирать эту связку

AG-UI как протокол поддерживается широким набором фреймворков — LangGraph, Pydantic AI, LlamaIndex, CrewAI, Mastra и др. Сам AG-UI давно не аргумент «за» один конкретный фреймворк.

Что осознанно выбирается в AG2.beta + dishka-ag2:

  • Готовый мост к Dishka. Если в проекте уже Dishka — dishka-ag2 решает это сразу, без второго параллельного DI и без dependencies={...}-словарей.

  • Multi-agent-оркестрация. Историческая ниша AG2: групповые чаты, hand-off, паттерны «менеджер — исполнители». Если архитектура одного агента перестаёт хватать — растягивается без смены фреймворка.

Заключение

AG2.beta + dishka-ag2 — рабочая связка для ИИ-агента с нормальной архитектурой: один DI-контейнер на проект, типизированное внедрение в тулзы, предсказуемая транзакционная семантика и ноль магических словарей в коде агента.

Автор: vvlrff

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js