Cитуация: у вас в проекте давно живёт Dishka. Контейнер настроен, пулы, фабрики UoW. И тут вы добавляете LLM-агента (например, на AG2). А он такой:
«Спасибо, держи
dependencies={'session': session, 'repo': repo}и мою собственную DI-библиотеку. Не благодари».
То есть теперь у вас два параллельных DI-контейнера: один для HTTP-роутов, второй — для тулз агента. Один и тот же сценарий приходится регистрировать дважды.
На каждый /chat-роут — руками собирать словарь зависимостей и передавать его в agent.ask(dependencies={...}). И всё это — рядом с уже существующим Dishka-контейнером, который ровно эту работу делать умеет.
Эта статья — про то, как этого больше не делать. Внутри:
-
зачем агенту вообще нужен полноценный DI и почему AG2 поставляется со встроенным
fast-depends; -
пакет
dishka-ag2— мост между AG2 и Dishka, который превращает два контейнера в один; -
рабочий пример, который я собрал: FastAPI + AG2-агент со SSE-стримом, тулзы ходят в Postgres через типизированный DI;
-
транзакционная семантика tool calls — что происходит, когда модель вызывает три инструмента параллельно и один из них падает.
Полный код примера: github.com/vvlrff/ag2_ag-ui_example — со всеми миграциями, Docker-сборкой и тестами.
Проблема: почему dependencies={…} — это боль
AG2 предлагает прокидывать зависимости в инструменты через словарь — либо на уровне агента, либо на каждый прогон:
# на уровне агента (живёт всю жизнь приложения)agent = Agent( tools=[my_tool], dependencies={"static_dep": something},)# на каждый прогон — актуальная сессия летит сюдаawait agent.ask("...", dependencies={"db_session": session, "repo": repo})
Достаёшь в тулзе через fast-depends-аннотацию — типизация на месте:
@toolasync def my_tool(session: Annotated[AsyncSession, Inject()]) -> ...: ...
Это работает. И до какого-то момента этого хватает. Что не так начинается дальше:
-
Дублирование DI. Если в проекте уже есть Dishka — у вас два контейнера. Один и тот же
CreateNoteUseCaseрегистрируется в Dishka (для HTTP-роутов) и где-то рядом собирается руками дляdependencies={...}. Две политики жизненных циклов. Любая правка в дереве зависимостей — в двух местах. -
Ручная сборка словаря в каждом /chat-роуте. На каждый запрос вы открываете Dishka REQUEST, достаёте оттуда сессию, репо, нужные сервисы, складываете в
dictи передаёте вagent.ask(dependencies=...). Это ровно та плумбинг-работа, которую DI-контейнер должен делать сам. -
Каждое новое требование (логгер, кеш, http-клиент) дописывается в этот же словарь. Через полгода у вас там 15 ключей, и читать сборку этого словаря — отдельное удовольствие.
-
Скоуп — на весь прогон, а не на tool call. Сессия, переданная в
agent.ask, живёт пока работает прогон. Если LLM дёргает три тулзы подряд — все три сидят на одной сессии и одной транзакции. Нужно «каждый tool call — своя транзакция»? Безdishka-ag2это придётся писать руками.
Желаемое API — типизированное внедрение, как в FastAPI:
@tool@injectasync def create_note( title: str, body: str = "", *, uc: FromDishka[CreateNoteUseCase],) -> NoteToolResult: response = await uc.execute(CreateNoteRequest(title=title, body=body)) return NoteToolResult.from_entity(response.note)
LLM в схему инструмента видит только title и body. Сценарий внедряется через DI и живёт ровно столько, сколько работает тулза. Один контейнер на весь проект. Это и есть dishka-ag2.
Под капотом AG2: зачем там fast-depends
Чтобы понять, как dishka-ag2 встраивается в AG2, нужно знать одну деталь: внутри AG2 уже работает DI — библиотека fast-depends, автономный механизм работы с зависимостями в духе FastAPI Depends, но без привязки к веб-фреймворку.
fast-depends — это инжектор аргументов, а не контейнер. Разница принципиальная: он умеет распознать Annotated[T, Inject()] в сигнатуре, дёрнуть зарегистрированный провайдер, подставить результат в вызов. Чего у него нет — иерархии скоупов (APP/REQUEST/…) и общего реестра, в котором провайдеры могут зависеть друг от друга и переиспользоваться между разными точками входа.
Заодно — два слова про AG-UI. Это SSE-протокол, которым сервер шлёт клиенту события агента в реальном времени:
|
Событие |
Когда |
|
|
границы прогона |
|
|
прогон упал |
|
|
стрим токенов от LLM |
|
|
LLM решил вызвать инструмент |
|
|
результат / завершение вызова |
autogen.beta.ag_ui.AGUIStream— обёртка, которая берёт внутренние события агента и сериализует их в SSE-фреймы. AG-UI поддерживают AG2, LangGraph, Pydantic AI, LlamaIndex, CrewAI и т.д. — так что сам по себе он давно не аргумент «за» какой-то фреймворк. Аргумент здесь — связкаAG2.beta + dishka-ag2.
Пакет dishka-ag2: три кирпичика
1. AG2Scope — иерархия скоупов
Семь уровней, но реально вам нужны два:
-
AG2Scope.APP— синглтоны на жизнь приложения: engine SQLAlchemy, sessionmaker, конфиг,OpenAIConfig. -
AG2Scope.REQUEST— открывается на каждый HTTP-запрос (через ASGI-middleware) и на каждый tool call (декоратором@inject). Внутри живут сессия БД, репозитории, сценарии.
2. DishkaAsyncMiddleware — middleware агента
Подключается к Agent. Перехватывает события агента (on_turn, on_tool_execution, on_llm_call) и готовит контекст для будущего открытия скоупа. Сам скоуп REQUEST middleware не открывает — только складывает текущий Context и ToolCallEvent в pending-слот.
3. @inject — декоратор тулзы
Когда AG2 вызывает инструмент, @inject:
-
Достаёт корневой контейнер из контекста.
-
Открывает
AG2Scope.REQUESTс данными из pending-слота. -
Резолвит все аргументы, помеченные
FromDishka[T]. -
Вызывает функцию.
-
Закрывает скоуп после возврата (или после исключения).
Иными словами, скоуп живёт ровно столько, сколько выполняется тело инструмента. Сессия БД — тоже.
Плюс к этому пакет даёт AG2Provider — провайдер «из коробки», который умеет отдавать ToolCallEvent, Context, ConversationContainer и т.п. в виде зависимостей. Без него типы из самого AG2 в инструмент не внедрятся, поэтому AG2Provider() в default_providers() обязателен.
Архитектура примера
Жизненный цикл скоупов на одном HTTP-запросе:
HTTP POST /api/chat │ ▼ AG2ContainerMiddleware ── открывает AG2Scope.REQUEST для HTTP │ ▼ chat-route ── берёт agent = app.state.agent (синглтон) │ ▼ AGUIStream(agent).dispatch(run_input) │ агент решает вызвать инструмент ▼ DishkaAsyncMiddleware.on_tool_execution │ stash_request_context: сохраняет Context, ToolCallEvent │ (скоуп НЕ открывает) ▼ @inject create_note(uc: FromDishka[CreateNoteUseCase]) │ открывает AG2Scope.REQUEST со stash-данными │ резолвит uc через цепочку DI ▼ CreateNoteUseCase → AlchemyNoteRepository → AsyncSession → Postgres │ ▼ После возврата @inject закрывает REQUEST-скоуп │ (сессия БД отпущена обратно в пул) ▼ ToolResultEvent → AGUIStream → SSE TOOL_CALL_RESULT
Контейнер один. Скоуп REQUEST открывается дважды — на HTTP-запрос и на каждый tool call. Это сделано намеренно: каждый tool call — своя транзакция (см. ниже).
Сборка снизу вверх (только важное)
# domain/entities.pyNoteId = NewType("NoteId", UUID)@dataclassclass Note: id: NoteId title: str body: str created_at: datetime # gateways/db/note/interface.pyclass NoteRepository(Protocol): async def create(self, note: Note) -> Note: ... async def list_notes(self, limit: int = 20) -> list[Note]: ... async def delete(self, note_id: NoteId) -> bool: ...
Реализация на SQLAlchemy 2.0 — полный код в репозитории. Транзакция в репозитории явно не закрывается — этим занимается UoW.
Сценарий + UnitOfWork
# usecases/uow.pyclass UnitOfWork(Protocol): async def commit(self) -> None: ... async def rollback(self) -> None: ... async def flush(self) -> None: ...
AsyncSession структурно удовлетворяет этому протоколу — отдельный класс не нужен. Это пригодится через шаг.
class CreateNoteUseCase: def __init__(self, repo: NoteRepository, uow: UnitOfWork) -> None: self._repo = repo self._uow = uow async def execute(self, request: CreateNoteRequest) -> CreateNoteResponse: note = Note( id=NoteId(uuid4()), title=request.title, body=request.body, created_at=datetime.now(UTC) ) created = await self._repo.create(note) await self._uow.commit() return CreateNoteResponse(note=created)
Сценарий не знает ни про FastAPI, ни про SQLAlchemy, ни про AG2 — только два протокола. Тестируется в полной изоляции.
DI-провайдер с одним хитрым приёмом
class DatabaseProvider(Provider): @provide(scope=AG2Scope.APP) async def provide_async_engine(self, settings: Settings) -> AsyncIterator[AsyncEngine]: engine = create_async_engine( make_url(settings.database_url), pool_size=10, max_overflow=10, pool_timeout=10, pool_pre_ping=True, ) yield engine await engine.dispose() @provide(scope=AG2Scope.APP) def provide_sessionmaker(self, engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: return async_sessionmaker[AsyncSession]( bind=engine, autoflush=False, autocommit=False, expire_on_commit=False, ) @provide(scope=AG2Scope.REQUEST) async def provide_async_session( self, pool: async_sessionmaker[AsyncSession] ) -> AsyncIterator[AnyOf[AsyncSession, UnitOfWork]]: # ← ключевая строчка async with pool() as session: yield session
AnyOf[AsyncSession, UnitOfWork] — Dishka регистрирует один и тот же объект под двумя интерфейсами. Репозиторий получит его как AsyncSession, сценарий — как UnitOfWork. Это одна сессия, поэтому await uow.commit() коммитит транзакцию, в которой репозиторий сделал add и flush.
Сборка контейнера
def default_providers() -> tuple[Provider, ...]: return ( SettingsProvider(), DatabaseProvider(), RepositoryProvider(), UseCaseProvider(), AgentProvider(), AG2Provider(), # обязателен )def create_container(*providers, context=None) -> AsyncContainer: return make_async_container( *providers, context=context, scopes=AG2Scope, # переключатель на иерархию dishka-ag2 )
scopes=AG2Scope вместо стандартной Scope.{APP,REQUEST,...} Dishka — иначе AG2Scope.REQUEST не подцепится.
Инструмент: @tool
from autogen.beta import Toolkit, toolfrom dishka_ag2 import FromDishka, inject@tool@injectasync def create_note( uc: FromDishka[CreateNoteUseCase], title: Annotated[str, Field(description="Short title of the note.")], body: Annotated[str, Field(description="Full text body (optional).")] = "",) -> dict[str, str]: """Create a new note in the database.""" response = await uc.execute(CreateNoteRequest(title=title, body=body)) return NoteToolResult.from_entity(response.note)
Три детали, на которых легко споткнуться:
-
Порядок декораторов: сначала
@tool, потом@inject.
@injectпринимает функцию и возвращает функцию.@toolпринимает функцию и возвращает объектFunctionTool— именно его ждётAgent(tools=[...]). В правильном порядке@injectсрабатывает первым (внутренний), потом@toolпревращает результат вFunctionTool. Если поменять местами —@toolсначала вернётFunctionTool,@injectобернёт его как обычный callable, и AG2 такой результат как инструмент не примет.
-
FromDishka[...]скрыт от LLM.
Внутри AG2 для сборки JSON Schema инструмента используется механизм
fast-depends, который умеет отличать «зависимости» от «бизнесовых параметров».@injectпомечаетFromDishka[T]как зависимость — в схему она не попадает. LLM видит толькоtitleиbody.
-
Docstring и типы — это контракт инструмента для LLM.
То, что в
Args:, попадает в описание параметров. Чем точнее формулировка — тем чаще модель попадает в нужный набор аргументов.
Сборка агента
def build_agent(config: OpenAIConfig, container: AsyncContainer) -> Agent: return Agent( name="example_assistant", prompt=SYSTEM_PROMPT, config=config, tools=[weather, notes_toolkit()], middleware=[Middleware(DishkaAsyncMiddleware, container=container)], )
Агент конструируется один раз при старте и кладётся в app.state.agent. На каждый HTTP-запрос его не пересобирать. Это работает, потому что REQUEST-скоуп открывается на каждый tool call декоратором @inject, а не агентом.
ASGI-middleware для REST-роутов
Стандартная FastAPI-интеграция Dishka не подходит из коробки — её хук на Scope.REQUEST рассчитан на стандартную иерархию, а у нас своя. Поэтому пишем ASGI-middleware на 12 строк:
class AG2ContainerMiddleware: def __init__(self, app: ASGIApp, container: AsyncContainer) -> None: self.app = app self.container = container async def __call__(self, scope, receive, send) -> None: if scope["type"] != "http": await self.app(scope, receive, send) return request = Request(scope, receive, send) async with self.container( context={Request: request}, scope=AG2Scope.REQUEST, ) as request_container: request.state.dishka_container = request_container await self.app(scope, receive, send)
И эндпоинт /api/chat без единого упоминания контейнера:
@router.post("")async def run_agent( run_input: Annotated[RunAgentInput, Body()], request: Request, accept: Annotated[str | None, Header()] = None,) -> StreamingResponse: agent: Agent = request.app.state.agent stream = AGUIStream(agent) return StreamingResponse( stream.dispatch(run_input, accept=accept), media_type=accept or "text/event-stream", )
Транзакционная семантика tool calls
Главная мысль: каждый tool call выполняется в собственной транзакции. Из этого вытекает несколько следствий, которые стоит проговорить явно.
Видимость данных между tool calls.
LLM может за один прогон вызвать
create_note, потомlist_notes. Это разные tool calls с разными сессиями. К моменту, когдаlist_notesначнёт работать, его сессия видит всё, что закоммитила сессияcreate_note. Поведение естественное и предсказуемое.
Параллельные tool calls.
OpenAI и Anthropic могут вернуть несколько tool calls в одном ответе. AG2.beta отдаёт их параллельно. Каждый — в своём
REQUEST-скоупе, со своей сессией из пула. Поэтомуpool_size=10, max_overflow=10— не наугад: это запас на N параллельных тулз. Если у вас агенты на 20 параллельных инструментов — увеличивайте пул.
Если один tool call упал — откатывается только его транзакция.
Если
create_noteуже закоммитился, аdelete_external_thingупал — заметка останется. Общей транзакции на весь прогон нет.
Один сценарий — это один UseCase, а не несколько тулз
Это главная архитектурная мысль статьи. Когда у бизнес-действия есть несколько эффектов — записать в БД, отправить уведомление, опубликовать событие в брокер — всё это входит в один сценарий. Не в десять тулз, и не в одну тулзу с явным session.begin().
Тулза остаётся обёрткой, вся оркестрация — внутри сценария, например:
class CreateNoteUseCase: def __init__( self, notes: NoteRepository, events: EventBus, notifier: NotificationService, uow: UnitOfWork ) -> None: self._notes = notes self._events = events self._notifier = notifier self._uow = uow async def execute(self, request: CreateNoteRequest) -> CreateNoteResponse: try: note = await self._notes.create(Note(...)) await self._events.publish(NoteCreated(note_id=note.id)) await self._notifier.send(...) await self._uow.commit() return CreateNoteResponse(note=note) except Exception: await self._uow.rollback() raise
raise в конце — обязательный. Если «съесть» исключение, модель решит, что операция прошла, и пойдёт дальше с неверным состоянием.
Антипаттерн, чтобы было нагляднее
# ❌ так не надо@tool@injectasync def create_note( session: FromDishka[AsyncSession], # деталь инфраструктуры в API тулзы notes: FromDishka[NoteRepository], # скоп оркестрации в тулзе events: FromDishka[EventBus], # туда же title: str, body: str = "",) -> dict[str, str]: ...
Тулза знает про БД, репозиторий, события — три слоя в одном месте. Любое изменение бизнес-логики потащит правки сюда. Правильно — оставить тулзе одну зависимость на сценарий и положить всю сложность за его границу.
Когда выбирать эту связку
AG-UI как протокол поддерживается широким набором фреймворков — LangGraph, Pydantic AI, LlamaIndex, CrewAI, Mastra и др. Сам AG-UI давно не аргумент «за» один конкретный фреймворк.
Что осознанно выбирается в AG2.beta + dishka-ag2:
-
Готовый мост к Dishka. Если в проекте уже Dishka —
dishka-ag2решает это сразу, без второго параллельного DI и безdependencies={...}-словарей. -
Multi-agent-оркестрация. Историческая ниша AG2: групповые чаты, hand-off, паттерны «менеджер — исполнители». Если архитектура одного агента перестаёт хватать — растягивается без смены фреймворка.
Заключение
AG2.beta + dishka-ag2 — рабочая связка для ИИ-агента с нормальной архитектурой: один DI-контейнер на проект, типизированное внедрение в тулзы, предсказуемая транзакционная семантика и ноль магических словарей в коде агента.
ссылка на оригинал статьи https://habr.com/ru/articles/1034102/