Мой личный джуниор. Часть 3. Учим агента ждать

от автора

Привет, Хабр! Меня зовут Владимир и это продолжение статьи про разработку локального кодер-агента.

В первой части мы создали инфраструктуру: подняли контейнер с моделью, настроили Langfuse для трассировки и написали простейшего агента с доступом к MCP-инструментам.

Во второй части мы добавили агенту мозги: планировщик, оценщики, защиту от зацикливания и суммаризацию контекста.

Сегодня мы превратим этот набор в работающий граф. Добавим чекпоинтеры, прерывания и доработаем интерфейс под обновлённую архитектуру.

Сборка графа

Доработку графа начнем с конструктора — в него необходимо добавить чекпоинтер. Дополнительно, для упорядочивания трассировок добавим session_id к LangFuse. Его будем формировать на основе текущей даты и времени, для чего в конструктор добавим временную метку создания экземпляра агента:

class MCPAgent:    def __init__(self):        self.llm_with_tools: ChatOpenAI | None = None        self.tools: list[BaseTool] | None = None        self.graph = None        self.checkpointer = InMemorySaver()        self.lf_handler = CallbackHandler(public_key=settings.langfuse.public_key)        self.init_time = datetime.now().strftime('%Y%m%d_%H%M%S')

Инициализация графа не менялась — инициализируем инструменты, биндим на LLM и собираем граф (код далее)

class MCPAgent:    # Предыдущий код    async def init_graph(self):        try:            self.tools = await init_tools()        except Exception as e:            logger.error(f'Ошибка инициализации инструментов: {e}')            raise        self.llm_with_tools = settings.llm.chat.llm.bind_tools(self.tools, parallel_tool_calls=False)        self.graph = self._compile_graph()

Перейдём к условным рёбрам. Начнём с рёбер маршрутиризации подтверждения плана и выполнения шага. В них проверяем флаг is_approved. Если подтверждение есть, то переходим к исполнению плана или суммаризации соответственно. Если нет, то возвращаем на доработку.

class MCPAgent:    # Предыдущий код        @staticmethod    def need_adjust_plan_router(state: AgentState) -> Literal['injector', 'planer']:        if state.get('is_approved', False):            return 'injector'        return 'planer'    @staticmethod    def need_modify_step_router(state: AgentState) -> Literal['agent_node', 'compressor']:        if state.get('is_approved', False):            return 'compressor'        return 'agent_node'

Теперь маршрутизатор перехода по шагам. В нём мы должны проверить, что план ещё в процессе выполнения. Проверяем сравнением номера шага и длины плана

class MCPAgent:    # Предыдущий код    @staticmethod    def next_step_router(state: AgentState) -> Literal['injector', 'finalizer']:        if state['current_step'] < len(state['plan']):            return 'injector'        return 'finalizer'

Соберём граф. Инициализируем узлы и соединим их согласно схеме из второй части статьи:

class MCPAgent:    # Предыдущий код    def _compile_graph(self):        workflow = StateGraph(AgentState)        workflow.add_node('agent_node', AgentNode(llm=self.llm_with_tools).node)        workflow.add_node('compressor', ContextCompressorNode().node)        workflow.add_node('finalizer', FinalizerNode(llm=settings.llm.chat.llm).node)        workflow.add_node('planer', PlanerNode(llm=settings.llm.chat.llm).node)        workflow.add_node('plan_solver', PlanSolverNode().node)        workflow.add_node('agent_solver', AgentSolverNode().node)        workflow.add_node('injector', StepInjectorNode().node)        workflow.add_node('tools', ToolNode(self.tools))        workflow.set_entry_point(key='planer')        workflow.add_edge(start_key='planer', end_key='plan_solver')        workflow.add_conditional_edges(source='plan_solver', path=self.need_adjust_plan_router)        workflow.add_edge(start_key='injector', end_key='agent_node')        workflow.add_conditional_edges(source='agent_node', path=self.agent_router)        workflow.add_edge(start_key='tools', end_key='agent_node')        workflow.add_conditional_edges(source='agent_solver', path=self.need_modify_step_router)        workflow.add_conditional_edges(source='compressor', path=self.next_step_router)        workflow.set_finish_point(key='finalizer')        graph = workflow.compile(            checkpointer=self.checkpointer,            interrupt_after=['planer'],            interrupt_before=['agent_solver'],        )        return graph

В компилятор передаём два останова — после формирования плана и до «решателя» узла-агента.

Для того, чтобы граф работал с остановами, нам надо два метода — один для первоначального запуска графа, второй — для продолжения общения. Начнем с запуска:

class MCPAgent:    # Предыдущий код    async def run(self, user_messages: str, request_id: str | None = None) -> dict[str, Any]:        self._check_graph_available()        trace_id = Langfuse.create_trace_id()        return await self._ainvoke_with_tracing(            data={'user_request': user_messages, 'user_input': user_messages, 'trace_id': trace_id},            request_id=request_id, trace_id=trace_id, span_name='agent_run')

Служебные методы разберу позже, пока сам алгоритм. Для начала проверяем, что не забыли инициализировать граф. Далее формируем trace_id для Langfuse средствами Langfuse. Без передачи trace_id Langfuse будет создавать отдельные трейсы для каждого возврата в граф. Ну и вызываем служебный метод вызова LLM.

Теперь метод возобновления общения:

class MCPAgent:    # Предыдущий код    async def resume(self, user_messages: str, request_id: str | None = None) -> dict[str, Any]:        self._check_graph_available()        trace_id = self._get_trace_id(request_id)        return await self._ainvoke_with_tracing(            data=Command(update={'user_input': user_messages}),            request_id=request_id, trace_id=trace_id, span_name='agent_resume')

Основные отличия — номер трейса в Langfuse мы берём из стейта и наше сообщение пользователя оборачиваем в Command. Переходим к служебным методам.

Первый метод проверяет наличие агента:

class MCPAgent:    # Предыдущий код    def _check_graph_available(self):        if self.graph is None:            raise RuntimeError(                'Агент не инициализирован. Запустите `initialize()`.')

Второй нужен для извлечения trace_id из сохраненного стейта:

class MCPAgent:    # Предыдущий код    def _get_trace_id(self, request_id: str) -> str | None:        return self.graph.get_state(            {'configurable': {'thread_id': request_id}}        ).values.get('trace_id', None)

Про thread_id писал в части про чекпоинтер. Он нужен для того, чтобы граф знал, стейт какой именно сессии надо извлекать из памяти. Формируется на стороне веб-интерфейса. Теперь метод вызова LLM:

class MCPAgent:    # Предыдущий код    async def _ainvoke_with_tracing(            self, data: dict[str, Any] | Command, request_id: str, trace_id: str, span_name: str    ) -> dict[str, Any]:        with settings.langfuse.client.start_as_current_observation(                as_type='span',                name=span_name,                trace_context={'trace_id': trace_id},        ) as span:            result = await self.graph.ainvoke(data, config=self._create_config(request_id))            return result['messages']

Первым делом создаём кастомное наблюдение типа спан и передаём в него параметром наш trace_id. Langfuse сам сгруппирует все наблюдения по trace_id. Далее вызываем метод ainvoke графа, передав в него конфиг:

class MCPAgent:    # Предыдущий код    def _create_config(self, request_id: str) -> dict[str, Any]:        return {            'callbacks': [self.lf_handler],            'metadata': {                'langfuse_session_id': f'docker_session_{self.init_time}',            },            'configurable': {'thread_id': request_id}        }

В конфиге мы прописываем Langfuse CallbackHandler для организации наблюдения, в метаданных передаём параметр langfuse_session_id, который в дальнейшем можно использовать для фильтрации трейсов и thread_id для сохранения стейта в чекпоинтере.

Граф готов. Осталось доработать Gradio интерфейс под новые функции и можно релизить)

Интерфейс пользователя

При изменении архитектуры агента (добавление подтверждения пользователем) я пошел на один компромис — подтверждение простым словом “Продолжить”. И чтобы каждый раз его не писать, я решил добавить в интерфейс кнопку “Продолжить”. Для этого пришлось отказаться от gr.ChatInterface и переписать интерфейс на gr.Chatbot:

class MCPCodingAgentApp:    def build_interface(self):        with gr.Blocks(title='MCP Coding Agent', fill_height=True) as self.demo:            gr.Markdown('# MCP Coding Agent')            gr.Markdown('Помощник разработчика с доступом к файлам, Git и документации')            chatbot = gr.Chatbot(label='Чат с агентом', height=700)            request_id_state = gr.State('')            with gr.Row():                msg = gr.Textbox(                    label='Ваше сообщение',                    placeholder='Введите сообщение или нажмите "Продолжить"',                    scale=8,                    container=False                )                submit_btn = gr.Button('Отправить', variant='primary')                continue_btn = gr.Button('▶ Продолжить', variant='secondary')            submit_btn.click(                fn=self._respond,                inputs=[msg, chatbot, request_id_state],                outputs=[chatbot, msg, request_id_state])            msg.submit(                fn=self._respond,                inputs=[msg, chatbot, request_id_state],                outputs=[chatbot, msg, request_id_state])            continue_btn.click(                fn=self._continue,                inputs=[chatbot, request_id_state],                outputs=[chatbot, request_id_state])

После поля чата создаём строку с полем для ввода сообщения и двумя кнопками — “Отправить” и “Продолжить”. Далее назначаем функции-обработчики для наших элементов (для поля ввода тоже, чтобы была отправка по нажатию Enter). Параметры inputs и outputs связывают входы-выходы функции-обработчика с объектами Gradio.

Объект gr.State('') нужен для хранения request_id в рамках сессии.

Так как чат у нас теперь самодельный, то и управлять всем в обработчиках мы должны руками:

class MCPCodingAgentApp:    # Предыдущий код    async def _respond(self, message: str, history: list, request_id: str):        if not request_id:            request_id = str(uuid.uuid4())        if not message or not message.strip():            return history, '', request_id        phase = self.agent.get_phase(request_id)        if phase is None or phase == 'done':            result = await self.agent.run(user_messages=message, request_id=request_id)        else:            result = await self.agent.resume(user_messages=message, request_id=request_id)        history.append({'role': 'user', 'content': message})        agent_response = result[-1].content if result else 'Нет ответа'        history.append({'role': 'assistant', 'content': agent_response})        return history, '', request_id

В обработчике кнопки “Отправить” и нажатия Enter мы должны предусмотреть, что пользователь решил напечатать слово “Продолжить” вместо нажатия отдельной кнопки. Для этого мы анализируем фазу выполнения агента и, в зависимости от неё, вызываем соответствующий метод агента. Дополнительно создаём первичный request_id, защищаемся от пустого сообщения (как в запросе, так и в ответе) и наполняем историю.

Обработчик продолжения общения немного покороче:

class MCPCodingAgentApp:    # Предыдущий код    async def _continue(self, history: list, request_id: str):        phase = self.agent.get_phase(request_id)        if phase is None or phase == 'done':            history.append({                'role': 'assistant',                'content': 'Нет активных задач для продолжения. Задайте новый вопрос.'            })            return history, request_id        result = await self.agent.resume(user_messages='Продолжить', request_id=request_id)        history.append({'role': 'user', 'content': '▶ Продолжить'})        agent_response = result[-1].content if result else 'Нет ответа'        history.append({'role': 'assistant', 'content': agent_response})        return history, request_id

Убеждаемся по фазе, что агент в процессе работы, после чего направляем ему сообщение “Продолжить”, иначе сообщаем пользователю, что вы не в цикле. Также сохраняем историю.

Код инициализации и запуска остаётся без изменений.

С кодом всё.

Проверка работоспособности

Запускаем make up.

Теперь можно расслабиться и откинуться на спинку кресла (есть тут те, кто ставил Windows98?😄)

Ждём закачки образов, модели. Для скачивания модели можно использовать uvx (ИМХО в разы быстрее):

HF_TOKEN=<ТОКЕН> HF_HOME=<ПАПКА ДЛЯ МОДЕЛИ> uvx hf download nvidia/Qwen3.6-35B-A3B-NVFP4

В HF_HOME нужно указать туже папку, которую указываем как том для vLLM.

Простой вопрос

Проверку начал с задания попроще, но в несколько шагов:

Напиши функцию проверки на четность. Сохрани её в файл и сделай коммит

Приемлемо. Глянем трейс задачи

Всё как надо — два шага выполнения, три вызова инструментов (write file, git add, git commit).

Вопрос посложнее

Тут нашел в интернете какую-то задачу на знание FastAPI, помеченную как средней сложности

Создай асинхронный dependency get_db, который открывает транзакцию.

Реализуй модель для эндпоинта User с id:uuid и nickname: str

Реализуй эндпоинт GET /users/{user_id}/posts, который возвращает пользователя и его посты в одном запросе к БД (без N+1 проблемы).

Сохрани результаты в отдельные файлы

Сделай коммит изменений

На запрос получил следующий план действий:

Глянем схему трейса

Красота) При выполнении агент даже вызывал инструменты для просмотра содержимого директории в поисках файла model.py. Ну и результат работы агента в рабочей директории:

Итоги

Финальным экспериментом я решил проверить, сможет ли агент написать документацию для самого себя. Дал ему доступ к собственной папке и попросил:

Проанализируй файлы в рабочей папке. Необходимо составить файл readme.md. Файл должен содержать краткое описание проекта и способ запуска

Получилось неожиданно хорошо — агент описал структуру проекта, перечислил основные компоненты и добавил команды для запуска через Docker. Результат можно посмотреть в репозитории: ссылка на README.md. Ручного в файле только про параметр WS.

Код проекта доступен тут

Первая часть с базой и простым ReAct агентом с MCP инструментами тут

Вторая часть в которой показана реализация узлов тут

ссылка на оригинал статьи https://habr.com/ru/articles/1049774/