Как мы научили AG2 дружить с нормальным DI (и почему это вообще нужно)

от автора

Cитуация: у вас в проекте давно живёт Dishka. Контейнер настроен, пулы, фабрики UoW. И тут вы добавляете LLM-агента (например, на AG2). А он такой:

«Спасибо, держи dependencies={'session': session, 'repo': repo} и мою собственную DI-библиотеку. Не благодари».

То есть теперь у вас два параллельных DI-контейнера: один для HTTP-роутов, второй — для тулз агента. Один и тот же сценарий приходится регистрировать дважды.
На каждый /chat-роут — руками собирать словарь зависимостей и передавать его в agent.ask(dependencies={...}). И всё это — рядом с уже существующим Dishka-контейнером, который ровно эту работу делать умеет.

Эта статья — про то, как этого больше не делать. Внутри:

  • зачем агенту вообще нужен полноценный DI и почему AG2 поставляется со встроенным fast-depends;

  • пакет dishka-ag2— мост между AG2 и Dishka, который превращает два контейнера в один;

  • рабочий пример, который я собрал: FastAPI + AG2-агент со SSE-стримом, тулзы ходят в Postgres через типизированный DI;

  • транзакционная семантика tool calls — что происходит, когда модель вызывает три инструмента параллельно и один из них падает.

Полный код примера: github.com/vvlrff/ag2_ag-ui_example — со всеми миграциями, Docker-сборкой и тестами.

Проблема: почему dependencies={…} — это боль

AG2 предлагает прокидывать зависимости в инструменты через словарь — либо на уровне агента, либо на каждый прогон:

# на уровне агента (живёт всю жизнь приложения)agent = Agent(    tools=[my_tool],    dependencies={"static_dep": something},)# на каждый прогон — актуальная сессия летит сюдаawait agent.ask("...", dependencies={"db_session": session, "repo": repo})

Достаёшь в тулзе через fast-depends-аннотацию — типизация на месте:

@toolasync def my_tool(session: Annotated[AsyncSession, Inject()]) -> ...:    ...

Это работает. И до какого-то момента этого хватает. Что не так начинается дальше:

  1. Дублирование DI. Если в проекте уже есть Dishka — у вас два контейнера. Один и тот же CreateNoteUseCase регистрируется в Dishka (для HTTP-роутов) и где-то рядом собирается руками для dependencies={...}. Две политики жизненных циклов. Любая правка в дереве зависимостей — в двух местах.

  2. Ручная сборка словаря в каждом /chat-роуте. На каждый запрос вы открываете Dishka REQUEST, достаёте оттуда сессию, репо, нужные сервисы, складываете в dict и передаёте в agent.ask(dependencies=...). Это ровно та плумбинг-работа, которую DI-контейнер должен делать сам.

  3. Каждое новое требование (логгер, кеш, http-клиент) дописывается в этот же словарь. Через полгода у вас там 15 ключей, и читать сборку этого словаря — отдельное удовольствие.

  4. Скоуп — на весь прогон, а не на tool call. Сессия, переданная в agent.ask, живёт пока работает прогон. Если LLM дёргает три тулзы подряд — все три сидят на одной сессии и одной транзакции. Нужно «каждый tool call — своя транзакция»? Без dishka-ag2 это придётся писать руками.

Желаемое API — типизированное внедрение, как в FastAPI:

@tool@injectasync def create_note(    title: str,    body: str = "",    *,    uc: FromDishka[CreateNoteUseCase],) -> NoteToolResult:    response = await uc.execute(CreateNoteRequest(title=title, body=body))    return NoteToolResult.from_entity(response.note)

LLM в схему инструмента видит только title и body. Сценарий внедряется через DI и живёт ровно столько, сколько работает тулза. Один контейнер на весь проект. Это и есть dishka-ag2.

Под капотом AG2: зачем там fast-depends

Чтобы понять, как dishka-ag2 встраивается в AG2, нужно знать одну деталь: внутри AG2 уже работает DI — библиотека fast-depends, автономный механизм работы с зависимостями в духе FastAPI Depends, но без привязки к веб-фреймворку.

fast-depends — это инжектор аргументов, а не контейнер. Разница принципиальная: он умеет распознать Annotated[T, Inject()] в сигнатуре, дёрнуть зарегистрированный провайдер, подставить результат в вызов. Чего у него нет — иерархии скоупов (APP/REQUEST/…) и общего реестра, в котором провайдеры могут зависеть друг от друга и переиспользоваться между разными точками входа.

Заодно — два слова про AG-UI. Это SSE-протокол, которым сервер шлёт клиенту события агента в реальном времени:

Событие

Когда

RUN_STARTED / RUN_FINISHED

границы прогона

RUN_ERROR

прогон упал

TEXT_MESSAGE_CHUNK

стрим токенов от LLM

TOOL_CALL_START / TOOL_CALL_ARGS

LLM решил вызвать инструмент

TOOL_CALL_RESULT / TOOL_CALL_END

результат / завершение вызова

autogen.beta.ag_ui.AGUIStream — обёртка, которая берёт внутренние события агента и сериализует их в SSE-фреймы. AG-UI поддерживают AG2, LangGraph, Pydantic AI, LlamaIndex, CrewAI и т.д. — так что сам по себе он давно не аргумент «за» какой-то фреймворк. Аргумент здесь — связка AG2.beta + dishka-ag2.

Пакет dishka-ag2: три кирпичика

1. AG2Scope — иерархия скоупов

Семь уровней, но реально вам нужны два:

  • AG2Scope.APP — синглтоны на жизнь приложения: engine SQLAlchemy, sessionmaker, конфиг, OpenAIConfig.

  • AG2Scope.REQUEST — открывается на каждый HTTP-запрос (через ASGI-middleware) и на каждый tool call (декоратором @inject). Внутри живут сессия БД, репозитории, сценарии.

2. DishkaAsyncMiddleware — middleware агента

Подключается к Agent. Перехватывает события агента (on_turn, on_tool_execution, on_llm_call) и готовит контекст для будущего открытия скоупа. Сам скоуп REQUEST middleware не открывает — только складывает текущий Context и ToolCallEvent в pending-слот.

3. @inject — декоратор тулзы

Когда AG2 вызывает инструмент, @inject:

  1. Достаёт корневой контейнер из контекста.

  2. Открывает AG2Scope.REQUEST с данными из pending-слота.

  3. Резолвит все аргументы, помеченные FromDishka[T].

  4. Вызывает функцию.

  5. Закрывает скоуп после возврата (или после исключения).

Иными словами, скоуп живёт ровно столько, сколько выполняется тело инструмента. Сессия БД — тоже.

Плюс к этому пакет даёт AG2Provider — провайдер «из коробки», который умеет отдавать ToolCallEvent, Context, ConversationContainer и т.п. в виде зависимостей. Без него типы из самого AG2 в инструмент не внедрятся, поэтому AG2Provider() в default_providers() обязателен.

Архитектура примера

Жизненный цикл скоупов на одном HTTP-запросе:

HTTP POST /api/chat   │   ▼ AG2ContainerMiddleware ── открывает AG2Scope.REQUEST для HTTP   │   ▼ chat-route ── берёт agent = app.state.agent (синглтон)   │   ▼ AGUIStream(agent).dispatch(run_input)   │      агент решает вызвать инструмент   ▼   DishkaAsyncMiddleware.on_tool_execution   │      stash_request_context: сохраняет Context, ToolCallEvent   │      (скоуп НЕ открывает)   ▼   @inject create_note(uc: FromDishka[CreateNoteUseCase])   │      открывает AG2Scope.REQUEST со stash-данными   │      резолвит uc через цепочку DI   ▼   CreateNoteUseCase → AlchemyNoteRepository → AsyncSession → Postgres   │   ▼ После возврата @inject закрывает REQUEST-скоуп   │      (сессия БД отпущена обратно в пул)   ▼ ToolResultEvent → AGUIStream → SSE TOOL_CALL_RESULT

Контейнер один. Скоуп REQUEST открывается дважды — на HTTP-запрос и на каждый tool call. Это сделано намеренно: каждый tool call — своя транзакция (см. ниже).

Сборка снизу вверх (только важное)

# domain/entities.pyNoteId = NewType("NoteId", UUID)@dataclassclass Note:    id: NoteId    title: str    body: str    created_at: datetime    # gateways/db/note/interface.pyclass NoteRepository(Protocol):    async def create(self, note: Note) -> Note: ...    async def list_notes(self, limit: int = 20) -> list[Note]: ...    async def delete(self, note_id: NoteId) -> bool: ...

Реализация на SQLAlchemy 2.0 — полный код в репозитории. Транзакция в репозитории явно не закрывается — этим занимается UoW.

Сценарий + UnitOfWork

# usecases/uow.pyclass UnitOfWork(Protocol):    async def commit(self) -> None: ...    async def rollback(self) -> None: ...    async def flush(self) -> None: ...

AsyncSession структурно удовлетворяет этому протоколу — отдельный класс не нужен. Это пригодится через шаг.

class CreateNoteUseCase:    def __init__(self, repo: NoteRepository, uow: UnitOfWork) -> None:        self._repo = repo        self._uow = uow    async def execute(self, request: CreateNoteRequest) -> CreateNoteResponse:        note = Note(            id=NoteId(uuid4()),             title=request.title,            body=request.body,             created_at=datetime.now(UTC)        )        created = await self._repo.create(note)        await self._uow.commit()        return CreateNoteResponse(note=created)

Сценарий не знает ни про FastAPI, ни про SQLAlchemy, ни про AG2 — только два протокола. Тестируется в полной изоляции.

DI-провайдер с одним хитрым приёмом

class DatabaseProvider(Provider):    @provide(scope=AG2Scope.APP)    async def provide_async_engine(self, settings: Settings) -> AsyncIterator[AsyncEngine]:        engine = create_async_engine(            make_url(settings.database_url),            pool_size=10,            max_overflow=10,            pool_timeout=10,            pool_pre_ping=True,        )        yield engine        await engine.dispose()    @provide(scope=AG2Scope.APP)    def provide_sessionmaker(self, engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:        return async_sessionmaker[AsyncSession](            bind=engine,            autoflush=False,            autocommit=False,            expire_on_commit=False,        )    @provide(scope=AG2Scope.REQUEST)    async def provide_async_session(        self, pool: async_sessionmaker[AsyncSession]    ) -> AsyncIterator[AnyOf[AsyncSession, UnitOfWork]]:  # ← ключевая строчка        async with pool() as session:            yield session

AnyOf[AsyncSession, UnitOfWork] — Dishka регистрирует один и тот же объект под двумя интерфейсами. Репозиторий получит его как AsyncSession, сценарий — как UnitOfWork. Это одна сессия, поэтому await uow.commit() коммитит транзакцию, в которой репозиторий сделал add и flush.

Сборка контейнера

def default_providers() -> tuple[Provider, ...]:    return (        SettingsProvider(),        DatabaseProvider(),        RepositoryProvider(),        UseCaseProvider(),        AgentProvider(),        AG2Provider(),  # обязателен    )def create_container(*providers, context=None) -> AsyncContainer:    return make_async_container(        *providers,        context=context,        scopes=AG2Scope,  # переключатель на иерархию dishka-ag2    )

scopes=AG2Scope вместо стандартной Scope.{APP,REQUEST,...} Dishka — иначе AG2Scope.REQUEST не подцепится.

Инструмент: @tool

from autogen.beta import Toolkit, toolfrom dishka_ag2 import FromDishka, inject@tool@injectasync def create_note(    uc: FromDishka[CreateNoteUseCase],    title: Annotated[str, Field(description="Short title of the note.")],    body: Annotated[str, Field(description="Full text body (optional).")] = "",) -> dict[str, str]:    """Create a new note in the database."""    response = await uc.execute(CreateNoteRequest(title=title, body=body))    return NoteToolResult.from_entity(response.note)

Три детали, на которых легко споткнуться:

  • Порядок декораторов: сначала @tool, потом @inject.

@inject принимает функцию и возвращает функцию. @tool принимает функцию и возвращает объект FunctionTool — именно его ждёт Agent(tools=[...]). В правильном порядке @inject срабатывает первым (внутренний), потом @tool превращает результат в FunctionTool. Если поменять местами — @tool сначала вернёт FunctionTool, @inject обернёт его как обычный callable, и AG2 такой результат как инструмент не примет.

  • FromDishka[...] скрыт от LLM.

Внутри AG2 для сборки JSON Schema инструмента используется механизм fast-depends, который умеет отличать «зависимости» от «бизнесовых параметров». @inject помечает FromDishka[T] как зависимость — в схему она не попадает. LLM видит только title и body.

  • Docstring и типы — это контракт инструмента для LLM.

То, что в Args:, попадает в описание параметров. Чем точнее формулировка — тем чаще модель попадает в нужный набор аргументов.

Сборка агента

def build_agent(config: OpenAIConfig, container: AsyncContainer) -> Agent:    return Agent(        name="example_assistant",        prompt=SYSTEM_PROMPT,        config=config,        tools=[weather, notes_toolkit()],        middleware=[Middleware(DishkaAsyncMiddleware, container=container)],    )

Агент конструируется один раз при старте и кладётся в app.state.agent. На каждый HTTP-запрос его не пересобирать. Это работает, потому что REQUEST-скоуп открывается на каждый tool call декоратором @inject, а не агентом.

ASGI-middleware для REST-роутов

Стандартная FastAPI-интеграция Dishka не подходит из коробки — её хук на Scope.REQUEST рассчитан на стандартную иерархию, а у нас своя. Поэтому пишем ASGI-middleware на 12 строк:

class AG2ContainerMiddleware:    def __init__(self, app: ASGIApp, container: AsyncContainer) -> None:        self.app = app        self.container = container    async def __call__(self, scope, receive, send) -> None:        if scope["type"] != "http":            await self.app(scope, receive, send)            return        request = Request(scope, receive, send)        async with self.container(            context={Request: request}, scope=AG2Scope.REQUEST,        ) as request_container:            request.state.dishka_container = request_container            await self.app(scope, receive, send)

И эндпоинт /api/chat без единого упоминания контейнера:

@router.post("")async def run_agent(    run_input: Annotated[RunAgentInput, Body()],    request: Request,    accept: Annotated[str | None, Header()] = None,) -> StreamingResponse:    agent: Agent = request.app.state.agent    stream = AGUIStream(agent)    return StreamingResponse(        stream.dispatch(run_input, accept=accept),        media_type=accept or "text/event-stream",    )

Транзакционная семантика tool calls

Главная мысль: каждый tool call выполняется в собственной транзакции. Из этого вытекает несколько следствий, которые стоит проговорить явно.

Видимость данных между tool calls.

LLM может за один прогон вызвать create_note, потом list_notes. Это разные tool calls с разными сессиями. К моменту, когда list_notes начнёт работать, его сессия видит всё, что закоммитила сессия create_note. Поведение естественное и предсказуемое.

Параллельные tool calls.

OpenAI и Anthropic могут вернуть несколько tool calls в одном ответе. AG2.beta отдаёт их параллельно. Каждый — в своём REQUEST-скоупе, со своей сессией из пула. Поэтому pool_size=10, max_overflow=10 — не наугад: это запас на N параллельных тулз. Если у вас агенты на 20 параллельных инструментов — увеличивайте пул.

Если один tool call упал — откатывается только его транзакция.

Если create_note уже закоммитился, а delete_external_thing упал — заметка останется. Общей транзакции на весь прогон нет.

Один сценарий — это один UseCase, а не несколько тулз

Это главная архитектурная мысль статьи. Когда у бизнес-действия есть несколько эффектов — записать в БД, отправить уведомление, опубликовать событие в брокер — всё это входит в один сценарий. Не в десять тулз, и не в одну тулзу с явным session.begin().

Тулза остаётся обёрткой, вся оркестрация — внутри сценария, например:

class CreateNoteUseCase:    def __init__(        self,         notes: NoteRepository,         events: EventBus,        notifier: NotificationService,         uow: UnitOfWork    ) -> None:        self._notes = notes        self._events = events        self._notifier = notifier        self._uow = uow    async def execute(self, request: CreateNoteRequest) -> CreateNoteResponse:        try:            note = await self._notes.create(Note(...))            await self._events.publish(NoteCreated(note_id=note.id))            await self._notifier.send(...)            await self._uow.commit()            return CreateNoteResponse(note=note)        except Exception:            await self._uow.rollback()            raise

raise в конце — обязательный. Если «съесть» исключение, модель решит, что операция прошла, и пойдёт дальше с неверным состоянием.

Антипаттерн, чтобы было нагляднее

# ❌ так не надо@tool@injectasync def create_note(    session: FromDishka[AsyncSession],         # деталь инфраструктуры в API тулзы    notes: FromDishka[NoteRepository],         # скоп оркестрации в тулзе    events: FromDishka[EventBus],              # туда же    title: str,     body: str = "",) -> dict[str, str]: ...

Тулза знает про БД, репозиторий, события — три слоя в одном месте. Любое изменение бизнес-логики потащит правки сюда. Правильно — оставить тулзе одну зависимость на сценарий и положить всю сложность за его границу.

Когда выбирать эту связку

AG-UI как протокол поддерживается широким набором фреймворков — LangGraph, Pydantic AI, LlamaIndex, CrewAI, Mastra и др. Сам AG-UI давно не аргумент «за» один конкретный фреймворк.

Что осознанно выбирается в AG2.beta + dishka-ag2:

  • Готовый мост к Dishka. Если в проекте уже Dishka — dishka-ag2 решает это сразу, без второго параллельного DI и без dependencies={...}-словарей.

  • Multi-agent-оркестрация. Историческая ниша AG2: групповые чаты, hand-off, паттерны «менеджер — исполнители». Если архитектура одного агента перестаёт хватать — растягивается без смены фреймворка.

Заключение

AG2.beta + dishka-ag2 — рабочая связка для ИИ-агента с нормальной архитектурой: один DI-контейнер на проект, типизированное внедрение в тулзы, предсказуемая транзакционная семантика и ноль магических словарей в коде агента.

ссылка на оригинал статьи https://habr.com/ru/articles/1034102/