Привет, Хабр! Меня зовут Владимир и это продолжение статьи про разработку локального кодер-агента.
В первой части мы создали инфраструктуру: подняли контейнер с моделью, настроили Langfuse для трассировки и написали простейшего агента с доступом к MCP-инструментам.
Во второй части мы добавили агенту мозги: планировщик, оценщики, защиту от зацикливания и суммаризацию контекста.
Сегодня мы превратим этот набор в работающий граф. Добавим чекпоинтеры, прерывания и доработаем интерфейс под обновлённую архитектуру.
Сборка графа
Доработку графа начнем с конструктора — в него необходимо добавить чекпоинтер. Дополнительно, для упорядочивания трассировок добавим session_id к LangFuse. Его будем формировать на основе текущей даты и времени, для чего в конструктор добавим временную метку создания экземпляра агента:
class MCPAgent: def __init__(self): self.llm_with_tools: ChatOpenAI | None = None self.tools: list[BaseTool] | None = None self.graph = None self.checkpointer = InMemorySaver() self.lf_handler = CallbackHandler(public_key=settings.langfuse.public_key) self.init_time = datetime.now().strftime('%Y%m%d_%H%M%S')
Инициализация графа не менялась — инициализируем инструменты, биндим на LLM и собираем граф (код далее)
class MCPAgent: # Предыдущий код async def init_graph(self): try: self.tools = await init_tools() except Exception as e: logger.error(f'Ошибка инициализации инструментов: {e}') raise self.llm_with_tools = settings.llm.chat.llm.bind_tools(self.tools, parallel_tool_calls=False) self.graph = self._compile_graph()
Перейдём к условным рёбрам. Начнём с рёбер маршрутиризации подтверждения плана и выполнения шага. В них проверяем флаг is_approved. Если подтверждение есть, то переходим к исполнению плана или суммаризации соответственно. Если нет, то возвращаем на доработку.
class MCPAgent: # Предыдущий код @staticmethod def need_adjust_plan_router(state: AgentState) -> Literal['injector', 'planer']: if state.get('is_approved', False): return 'injector' return 'planer' @staticmethod def need_modify_step_router(state: AgentState) -> Literal['agent_node', 'compressor']: if state.get('is_approved', False): return 'compressor' return 'agent_node'
Теперь маршрутизатор перехода по шагам. В нём мы должны проверить, что план ещё в процессе выполнения. Проверяем сравнением номера шага и длины плана
class MCPAgent: # Предыдущий код @staticmethod def next_step_router(state: AgentState) -> Literal['injector', 'finalizer']: if state['current_step'] < len(state['plan']): return 'injector' return 'finalizer'
Соберём граф. Инициализируем узлы и соединим их согласно схеме из второй части статьи:
class MCPAgent: # Предыдущий код def _compile_graph(self): workflow = StateGraph(AgentState) workflow.add_node('agent_node', AgentNode(llm=self.llm_with_tools).node) workflow.add_node('compressor', ContextCompressorNode().node) workflow.add_node('finalizer', FinalizerNode(llm=settings.llm.chat.llm).node) workflow.add_node('planer', PlanerNode(llm=settings.llm.chat.llm).node) workflow.add_node('plan_solver', PlanSolverNode().node) workflow.add_node('agent_solver', AgentSolverNode().node) workflow.add_node('injector', StepInjectorNode().node) workflow.add_node('tools', ToolNode(self.tools)) workflow.set_entry_point(key='planer') workflow.add_edge(start_key='planer', end_key='plan_solver') workflow.add_conditional_edges(source='plan_solver', path=self.need_adjust_plan_router) workflow.add_edge(start_key='injector', end_key='agent_node') workflow.add_conditional_edges(source='agent_node', path=self.agent_router) workflow.add_edge(start_key='tools', end_key='agent_node') workflow.add_conditional_edges(source='agent_solver', path=self.need_modify_step_router) workflow.add_conditional_edges(source='compressor', path=self.next_step_router) workflow.set_finish_point(key='finalizer') graph = workflow.compile( checkpointer=self.checkpointer, interrupt_after=['planer'], interrupt_before=['agent_solver'], ) return graph
В компилятор передаём два останова — после формирования плана и до «решателя» узла-агента.
Для того, чтобы граф работал с остановами, нам надо два метода — один для первоначального запуска графа, второй — для продолжения общения. Начнем с запуска:
class MCPAgent: # Предыдущий код async def run(self, user_messages: str, request_id: str | None = None) -> dict[str, Any]: self._check_graph_available() trace_id = Langfuse.create_trace_id() return await self._ainvoke_with_tracing( data={'user_request': user_messages, 'user_input': user_messages, 'trace_id': trace_id}, request_id=request_id, trace_id=trace_id, span_name='agent_run')
Служебные методы разберу позже, пока сам алгоритм. Для начала проверяем, что не забыли инициализировать граф. Далее формируем trace_id для Langfuse средствами Langfuse. Без передачи trace_id Langfuse будет создавать отдельные трейсы для каждого возврата в граф. Ну и вызываем служебный метод вызова LLM.
Теперь метод возобновления общения:
class MCPAgent: # Предыдущий код async def resume(self, user_messages: str, request_id: str | None = None) -> dict[str, Any]: self._check_graph_available() trace_id = self._get_trace_id(request_id) return await self._ainvoke_with_tracing( data=Command(update={'user_input': user_messages}), request_id=request_id, trace_id=trace_id, span_name='agent_resume')
Основные отличия — номер трейса в Langfuse мы берём из стейта и наше сообщение пользователя оборачиваем в Command. Переходим к служебным методам.
Первый метод проверяет наличие агента:
class MCPAgent: # Предыдущий код def _check_graph_available(self): if self.graph is None: raise RuntimeError( 'Агент не инициализирован. Запустите `initialize()`.')
Второй нужен для извлечения trace_id из сохраненного стейта:
class MCPAgent: # Предыдущий код def _get_trace_id(self, request_id: str) -> str | None: return self.graph.get_state( {'configurable': {'thread_id': request_id}} ).values.get('trace_id', None)
Про thread_id писал в части про чекпоинтер. Он нужен для того, чтобы граф знал, стейт какой именно сессии надо извлекать из памяти. Формируется на стороне веб-интерфейса. Теперь метод вызова LLM:
class MCPAgent: # Предыдущий код async def _ainvoke_with_tracing( self, data: dict[str, Any] | Command, request_id: str, trace_id: str, span_name: str ) -> dict[str, Any]: with settings.langfuse.client.start_as_current_observation( as_type='span', name=span_name, trace_context={'trace_id': trace_id}, ) as span: result = await self.graph.ainvoke(data, config=self._create_config(request_id)) return result['messages']
Первым делом создаём кастомное наблюдение типа спан и передаём в него параметром наш trace_id. Langfuse сам сгруппирует все наблюдения по trace_id. Далее вызываем метод ainvoke графа, передав в него конфиг:
class MCPAgent: # Предыдущий код def _create_config(self, request_id: str) -> dict[str, Any]: return { 'callbacks': [self.lf_handler], 'metadata': { 'langfuse_session_id': f'docker_session_{self.init_time}', }, 'configurable': {'thread_id': request_id} }
В конфиге мы прописываем Langfuse CallbackHandler для организации наблюдения, в метаданных передаём параметр langfuse_session_id, который в дальнейшем можно использовать для фильтрации трейсов и thread_id для сохранения стейта в чекпоинтере.
Граф готов. Осталось доработать Gradio интерфейс под новые функции и можно релизить)
Интерфейс пользователя
При изменении архитектуры агента (добавление подтверждения пользователем) я пошел на один компромис — подтверждение простым словом “Продолжить”. И чтобы каждый раз его не писать, я решил добавить в интерфейс кнопку “Продолжить”. Для этого пришлось отказаться от gr.ChatInterface и переписать интерфейс на gr.Chatbot:
class MCPCodingAgentApp: def build_interface(self): with gr.Blocks(title='MCP Coding Agent', fill_height=True) as self.demo: gr.Markdown('# MCP Coding Agent') gr.Markdown('Помощник разработчика с доступом к файлам, Git и документации') chatbot = gr.Chatbot(label='Чат с агентом', height=700) request_id_state = gr.State('') with gr.Row(): msg = gr.Textbox( label='Ваше сообщение', placeholder='Введите сообщение или нажмите "Продолжить"', scale=8, container=False ) submit_btn = gr.Button('Отправить', variant='primary') continue_btn = gr.Button('▶ Продолжить', variant='secondary') submit_btn.click( fn=self._respond, inputs=[msg, chatbot, request_id_state], outputs=[chatbot, msg, request_id_state]) msg.submit( fn=self._respond, inputs=[msg, chatbot, request_id_state], outputs=[chatbot, msg, request_id_state]) continue_btn.click( fn=self._continue, inputs=[chatbot, request_id_state], outputs=[chatbot, request_id_state])
После поля чата создаём строку с полем для ввода сообщения и двумя кнопками — “Отправить” и “Продолжить”. Далее назначаем функции-обработчики для наших элементов (для поля ввода тоже, чтобы была отправка по нажатию Enter). Параметры inputs и outputs связывают входы-выходы функции-обработчика с объектами Gradio.
Объект gr.State('') нужен для хранения request_id в рамках сессии.
Так как чат у нас теперь самодельный, то и управлять всем в обработчиках мы должны руками:
class MCPCodingAgentApp: # Предыдущий код async def _respond(self, message: str, history: list, request_id: str): if not request_id: request_id = str(uuid.uuid4()) if not message or not message.strip(): return history, '', request_id phase = self.agent.get_phase(request_id) if phase is None or phase == 'done': result = await self.agent.run(user_messages=message, request_id=request_id) else: result = await self.agent.resume(user_messages=message, request_id=request_id) history.append({'role': 'user', 'content': message}) agent_response = result[-1].content if result else 'Нет ответа' history.append({'role': 'assistant', 'content': agent_response}) return history, '', request_id
В обработчике кнопки “Отправить” и нажатия Enter мы должны предусмотреть, что пользователь решил напечатать слово “Продолжить” вместо нажатия отдельной кнопки. Для этого мы анализируем фазу выполнения агента и, в зависимости от неё, вызываем соответствующий метод агента. Дополнительно создаём первичный request_id, защищаемся от пустого сообщения (как в запросе, так и в ответе) и наполняем историю.
Обработчик продолжения общения немного покороче:
class MCPCodingAgentApp: # Предыдущий код async def _continue(self, history: list, request_id: str): phase = self.agent.get_phase(request_id) if phase is None or phase == 'done': history.append({ 'role': 'assistant', 'content': 'Нет активных задач для продолжения. Задайте новый вопрос.' }) return history, request_id result = await self.agent.resume(user_messages='Продолжить', request_id=request_id) history.append({'role': 'user', 'content': '▶ Продолжить'}) agent_response = result[-1].content if result else 'Нет ответа' history.append({'role': 'assistant', 'content': agent_response}) return history, request_id
Убеждаемся по фазе, что агент в процессе работы, после чего направляем ему сообщение “Продолжить”, иначе сообщаем пользователю, что вы не в цикле. Также сохраняем историю.
Код инициализации и запуска остаётся без изменений.
С кодом всё.
Проверка работоспособности
Запускаем make up.
Теперь можно расслабиться и откинуться на спинку кресла (есть тут те, кто ставил Windows98?😄)
Ждём закачки образов, модели. Для скачивания модели можно использовать uvx (ИМХО в разы быстрее):
HF_TOKEN=<ТОКЕН> HF_HOME=<ПАПКА ДЛЯ МОДЕЛИ> uvx hf download nvidia/Qwen3.6-35B-A3B-NVFP4
В HF_HOME нужно указать туже папку, которую указываем как том для vLLM.
Простой вопрос
Проверку начал с задания попроще, но в несколько шагов:
Напиши функцию проверки на четность. Сохрани её в файл и сделай коммит

Приемлемо. Глянем трейс задачи

Всё как надо — два шага выполнения, три вызова инструментов (write file, git add, git commit).
Вопрос посложнее
Тут нашел в интернете какую-то задачу на знание FastAPI, помеченную как средней сложности
Создай асинхронный dependency get_db, который открывает транзакцию.
Реализуй модель для эндпоинта User с id:uuid и nickname: str
Реализуй эндпоинт GET /users/{user_id}/posts, который возвращает пользователя и его посты в одном запросе к БД (без N+1 проблемы).
Сохрани результаты в отдельные файлы
Сделай коммит изменений
На запрос получил следующий план действий:

Глянем схему трейса

Красота) При выполнении агент даже вызывал инструменты для просмотра содержимого директории в поисках файла model.py. Ну и результат работы агента в рабочей директории:

Итоги
Финальным экспериментом я решил проверить, сможет ли агент написать документацию для самого себя. Дал ему доступ к собственной папке и попросил:
Проанализируй файлы в рабочей папке. Необходимо составить файл readme.md. Файл должен содержать краткое описание проекта и способ запуска
Получилось неожиданно хорошо — агент описал структуру проекта, перечислил основные компоненты и добавил команды для запуска через Docker. Результат можно посмотреть в репозитории: ссылка на README.md. Ручного в файле только про параметр WS.
Код проекта доступен тут
Первая часть с базой и простым ReAct агентом с MCP инструментами тут
Вторая часть в которой показана реализация узлов тут
ссылка на оригинал статьи https://habr.com/ru/articles/1049774/