Не одним Python едины: Spring AI в разработке MCP‑сервера BitDive

от автора

Многие внутри BitDive привыкли к Python: для анализа данных, прототипирования агентов и построения CI/CD‑утилит этот язык незаменим. Но когда нам потребовался единый масштабируемый MCP‑сервер (Message Control Plane) для обработки и маршрутизации телеметрии в реальном времени, мы решили попробовать нечто более декларативное и «из коробки» готовое к бою. Наш выбор — Spring Boot вместе с новым модулем Spring AI, который позволяет легко описывать инструменты (Tools) и управлять ими через единый SSE‑интерфейс.

1. Введение: почему Spring AI для MCP

1.1. Основные требования к MCP

  1. Высокая пропускная способность. Поток телеметрии достигает десятков тысяч событий в секунду.

  2. Гибкость маршрутизации. Одно событие должно одновременно попадать в ClickHouse, Kafka, Elasticsearch и другие хранилища.

  3. Простота расширения. Каждый новый инструмент (например, AI‑аннотация или трассировка) должен подключаться минимальным набором конфигурации.

  4. Надёжность и отказоустойчивость. Реактивная модель должна автоматически справляться с бэкпрешером и обеспечивать предсказуемые задержки.

1.2. Почему не Python или Go?

  • Python + asyncio. Нам пришлось бы тонко настраивать loop’ы, uvloop и C‑расширения для достижения нужной производительности. Код становился бы сложнее в сопровождении.

  • Go. Высокая производительность, но наша команда преимущественно на Java/Python. Обучение и поддержка Go‑экосистемы потребовали бы времени и ресурсов.

Spring Boot + Spring AI стали компромиссом: знакомый стек, минимальный «боилерплейт» и мощные декларативные возможности.

2. Простота разработки: от конфигурации до готового сервера

2.1. Шаг 1: Конфигурация в application.yml

Полный пример секции Spring AI для MCP‑сервера:

spring:   ai:     mcp:       server:         request-timeout: 180                # Таймаут AI‑запроса в секундах         enabled: true                       # Включить инструменты Spring AI для MCP         type: ASYNC                         # Режим работы: ASYNC или SYNC         name: bitdive-mcp                   # Имя сервера в списке доступных инструментов         version: 1.0.0                      # Версия сервера (может использоваться при развёртывании)         instructions: |                     # Описание сервера (передаётся клиентам)           Этот сервер предоставляет инструменты           для работы с системой мониторинга BitDive.          # Настройка SSE (Server‑Sent Events)         sse-endpoint: /sse                  # Точка подписки на события         sse-message-endpoint: /mcp/message  # Эндпоинт для приёма команд  # Пример настроек Netty для повышения производительности server:   port: 8080   reactive:     max-http-header-size: 16384     idle-timeout: 120s   netty:     max-connections: 10000 

Объяснение ключевых параметров:

  • type: ASYNC → клиент отправляет команду, а ответы инструментов приходят через SSE.

  • name и version → видны в списке сервисов у клиентов, облегчают роутинг и версионирование.

  • instructions → автоматически отображаются в помощи (help) у клиента.

2.2. Шаг 2: Подключаем зависимость

В pom.xml:

<dependency>   <groupId>org.springframework.ai</groupId>   <artifactId>spring-ai</artifactId>   <version>1.0.0</version> </dependency> 

После этого Spring Boot автоматически активирует балансировку Reactor Netty, настройку SSE и всё, что нужно для работы Spring AI.

2.3. Шаг 3: Описание Tools (инструментов)

С помощью аннотаций @Tool и @ToolParam описываем любой сервисный слой как набор команд:

@Service @RequiredArgsConstructor @Slf4j public class HeapMapTools {      private final MonitoringHeapMapComponent heapMapComponent;     private final ApiKeyComponent apiKeyComponent;      @Tool(       name = "getCurrentHeapMapAllSystem",       description = "Возвращает метрики производительности для всех систем"     )     public List<Map<String, Object>> getCurrentHeapMap(             @ToolParam(description = "API‑ключ для доступа") String apiKey     ) {         var user = apiKeyComponent.decryptApiKey(apiKey);         return heapMapComponent.getCurrentHeapMap(user);     }      @Tool(       name = "getCurrentHeapMapForModule",       description = "Метрики по конкретному модулю"     )     public List<Map<String, Object>> getCurrentHeapMapForModule(             @ToolParam(description = "Имя модуля") String module,             @ToolParam(description = "API‑ключ для доступа") String apiKey     ) {         var user = apiKeyComponent.decryptApiKey(apiKey);         return heapMapComponent.getCurrentHeapMap(module, user);     }      // Аналогично: для сервиса и для класса... } 

Почему так удобно? Вы описываете чисто бизнес‑логику, интерфейс автоматически становится доступен через SSE — без ручной регистрации или сложных конфигов.

2.4. Шаг 4: Регистрируем коллбэки инструментов

В конфигурационном классе объединяем все Beans с инструментами:

@Configuration public class ToolCallbackConfig {     @Bean     public ToolCallbackProvider monitoringTools(             TraceTools traceTools,             HeapMapTools heapMapTools,             LastCallTools lastCallTools     ) {         return MethodToolCallbackProvider.builder()                 .toolObjects(                     traceTools,                     heapMapTools,                     lastCallTools                 )                 .build();     } } 

После старта приложения под капотом Spring AI:

  • Сканирует все методы с @Tool и создаёт мэппинг

  • Запускает Netty‑сервер, слушающий /sse

  • На /mcp/message принимает команды и вызывает соответствующий метод

3. Подробности работы и решение проблем

3.1. Реактивный поток и backpressure

Spring WebFlux + Reactor Netty автоматически обрабатывает backpressure. Если клиент присылает события слишком быстро, Netty регулирует скорость чтения, а Reactor вовсе не выделяет лишние буферы.

Пример приёма и параллельной маршрутизации:

public Mono<Void> ingest(ServerRequest req) {     return req.bodyToFlux(TelemetryEvent.class)         .flatMap(event -> Flux.merge(             writeToClickHouse(event),             sendToKafka(event),             enrichWithAI(event)         ))         .then(); } 

Каждый flatMap — это отдельный асинхронный поток, а Flux.merge гарантирует независимость каналов.

3.2. Масштабирование и отказоустойчивость

  • Вертикальное масштабирование: настройка Netty позволяет увеличить число соединений до десятков тысяч.

  • Горизонтальное: несколько инстансов под балансировщиком Kubernetes принимают на себя часть пула SSE‑клиентов.

3.3. Расширение через AI‑инструменты

Допустим, нужно добавить команду, которая на основе события даёт рекомендацию по оптимизации GC-параметров через LLM:

@Tool(   name = "recommendGC",   description = "Рекомендации по GC конфигурации" ) public String recommendGC(         @ToolParam("Текущие метрики heap") String heapMetrics,         @ToolParam("API‑ключ") String apiKey ) {     var user = apiKeyComponent.decryptApiKey(apiKey);     String prompt = "Дай советы по GC конфигурации на основе: " + heapMetrics;     return aiClient.generateText(prompt).getText(); } 

Без изменения сети или инфраструктуры сервер сразу начнёт обрабатывать эту новую команду.

3.5. Проблема разрыва соединений и решение heartbeat

При использовании SSE-клиенты иногда теряли соединение: в Java SDK для MCP отсутствовала поддержка механизма ping–pong, из-за чего промежуточные балансировщики (NGINX, облачные LB) закрывали «холостые» соединения после таймаута.

Чтобы поддерживать активность сессий, мы добавили в провайдер SSE-соединений метод sendHeartbeat(), который посылает «ping» всем подключённым клиентам без ожидания ответа:

/**  * Sends a heartbeat (ping) to all connected clients to keep connections alive. This  * method sends ping notifications to all active sessions without expecting a  * response, which helps prevent connection timeouts.  * @return A Mono that completes when heartbeat has been sent to all sessions  */ public Mono<Void> sendHeartbeat() {     if (sessions.isEmpty()) {         logger.debug("No active sessions to send heartbeat to");         return Mono.empty();     }      logger.debug("Sending heartbeat to {} active sessions", sessions.size());      return Flux.fromIterable(sessions.values())         .flatMap(session -> session.sendNotification(McpSchema.METHOD_PING, null)             .doOnSuccess(v -> logger.trace("Heartbeat sent successfully to session {}", session.getId()))             .doOnError(e -> logger.warn("Heartbeat failed for session {}: {}", session.getId(), e.getMessage()))             .onErrorComplete()) // Continue with other sessions even if one fails         .then(); } 

И регистрируем периодическую отправку:

@Bean public Disposable heartbeatLoop(WebFluxSseServerTransportProvider provider) {     return Flux.interval(Duration.ofSeconds(40))             .flatMap(t -> provider.sendHeartbeat())             .subscribe(); } 

Итоги

  1. Декларативность. Никаких ручных WebClient, JSON‑парсинга и сложных конфигураций.

  2. Реактивность. Автоматический backpressure, независимые потоки, высокая пропускная способность.

  3. Расширяемость. Новые команды подключаются одной аннотацией.

  4. Интеграция AI. Spring AI из коробки работает с LLM — внутренняя логика клиента спрятана за абстракцией.

Наш MCP‑сервер на Spring AI стал ядром системы маршрутизации и инструментов, заменив десятки скриптов и микросервисов. Теперь телеметрия обрабатывается быстро, надёжно и просто расширяется — кроме Python, мы успешно дополнили стек сильным Java‑решением.


ссылка на оригинал статьи https://habr.com/ru/articles/923056/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *