Многие внутри BitDive привыкли к Python: для анализа данных, прототипирования агентов и построения CI/CD‑утилит этот язык незаменим. Но когда нам потребовался единый масштабируемый MCP‑сервер (Message Control Plane) для обработки и маршрутизации телеметрии в реальном времени, мы решили попробовать нечто более декларативное и «из коробки» готовое к бою. Наш выбор — Spring Boot вместе с новым модулем Spring AI, который позволяет легко описывать инструменты (Tools) и управлять ими через единый SSE‑интерфейс.
1. Введение: почему Spring AI для MCP
1.1. Основные требования к MCP
-
Высокая пропускная способность. Поток телеметрии достигает десятков тысяч событий в секунду.
-
Гибкость маршрутизации. Одно событие должно одновременно попадать в ClickHouse, Kafka, Elasticsearch и другие хранилища.
-
Простота расширения. Каждый новый инструмент (например, AI‑аннотация или трассировка) должен подключаться минимальным набором конфигурации.
-
Надёжность и отказоустойчивость. Реактивная модель должна автоматически справляться с бэкпрешером и обеспечивать предсказуемые задержки.
1.2. Почему не Python или Go?
-
Python + asyncio. Нам пришлось бы тонко настраивать loop’ы, uvloop и C‑расширения для достижения нужной производительности. Код становился бы сложнее в сопровождении.
-
Go. Высокая производительность, но наша команда преимущественно на Java/Python. Обучение и поддержка Go‑экосистемы потребовали бы времени и ресурсов.
Spring Boot + Spring AI стали компромиссом: знакомый стек, минимальный «боилерплейт» и мощные декларативные возможности.
2. Простота разработки: от конфигурации до готового сервера
2.1. Шаг 1: Конфигурация в application.yml
Полный пример секции Spring AI для MCP‑сервера:
spring: ai: mcp: server: request-timeout: 180 # Таймаут AI‑запроса в секундах enabled: true # Включить инструменты Spring AI для MCP type: ASYNC # Режим работы: ASYNC или SYNC name: bitdive-mcp # Имя сервера в списке доступных инструментов version: 1.0.0 # Версия сервера (может использоваться при развёртывании) instructions: | # Описание сервера (передаётся клиентам) Этот сервер предоставляет инструменты для работы с системой мониторинга BitDive. # Настройка SSE (Server‑Sent Events) sse-endpoint: /sse # Точка подписки на события sse-message-endpoint: /mcp/message # Эндпоинт для приёма команд # Пример настроек Netty для повышения производительности server: port: 8080 reactive: max-http-header-size: 16384 idle-timeout: 120s netty: max-connections: 10000
Объяснение ключевых параметров:
type: ASYNC→ клиент отправляет команду, а ответы инструментов приходят через SSE.
nameиversion→ видны в списке сервисов у клиентов, облегчают роутинг и версионирование.
instructions→ автоматически отображаются в помощи (help) у клиента.
2.2. Шаг 2: Подключаем зависимость
В pom.xml:
<dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai</artifactId> <version>1.0.0</version> </dependency>
После этого Spring Boot автоматически активирует балансировку Reactor Netty, настройку SSE и всё, что нужно для работы Spring AI.
2.3. Шаг 3: Описание Tools (инструментов)
С помощью аннотаций @Tool и @ToolParam описываем любой сервисный слой как набор команд:
@Service @RequiredArgsConstructor @Slf4j public class HeapMapTools { private final MonitoringHeapMapComponent heapMapComponent; private final ApiKeyComponent apiKeyComponent; @Tool( name = "getCurrentHeapMapAllSystem", description = "Возвращает метрики производительности для всех систем" ) public List<Map<String, Object>> getCurrentHeapMap( @ToolParam(description = "API‑ключ для доступа") String apiKey ) { var user = apiKeyComponent.decryptApiKey(apiKey); return heapMapComponent.getCurrentHeapMap(user); } @Tool( name = "getCurrentHeapMapForModule", description = "Метрики по конкретному модулю" ) public List<Map<String, Object>> getCurrentHeapMapForModule( @ToolParam(description = "Имя модуля") String module, @ToolParam(description = "API‑ключ для доступа") String apiKey ) { var user = apiKeyComponent.decryptApiKey(apiKey); return heapMapComponent.getCurrentHeapMap(module, user); } // Аналогично: для сервиса и для класса... }
Почему так удобно? Вы описываете чисто бизнес‑логику, интерфейс автоматически становится доступен через SSE — без ручной регистрации или сложных конфигов.
2.4. Шаг 4: Регистрируем коллбэки инструментов
В конфигурационном классе объединяем все Beans с инструментами:
@Configuration public class ToolCallbackConfig { @Bean public ToolCallbackProvider monitoringTools( TraceTools traceTools, HeapMapTools heapMapTools, LastCallTools lastCallTools ) { return MethodToolCallbackProvider.builder() .toolObjects( traceTools, heapMapTools, lastCallTools ) .build(); } }
После старта приложения под капотом Spring AI:
-
Сканирует все методы с
@Toolи создаёт мэппинг -
Запускает Netty‑сервер, слушающий
/sse -
На
/mcp/messageпринимает команды и вызывает соответствующий метод
3. Подробности работы и решение проблем
3.1. Реактивный поток и backpressure
Spring WebFlux + Reactor Netty автоматически обрабатывает backpressure. Если клиент присылает события слишком быстро, Netty регулирует скорость чтения, а Reactor вовсе не выделяет лишние буферы.
Пример приёма и параллельной маршрутизации:
public Mono<Void> ingest(ServerRequest req) { return req.bodyToFlux(TelemetryEvent.class) .flatMap(event -> Flux.merge( writeToClickHouse(event), sendToKafka(event), enrichWithAI(event) )) .then(); }
Каждый flatMap — это отдельный асинхронный поток, а Flux.merge гарантирует независимость каналов.
3.2. Масштабирование и отказоустойчивость
-
Вертикальное масштабирование: настройка Netty позволяет увеличить число соединений до десятков тысяч.
-
Горизонтальное: несколько инстансов под балансировщиком Kubernetes принимают на себя часть пула SSE‑клиентов.
3.3. Расширение через AI‑инструменты
Допустим, нужно добавить команду, которая на основе события даёт рекомендацию по оптимизации GC-параметров через LLM:
@Tool( name = "recommendGC", description = "Рекомендации по GC конфигурации" ) public String recommendGC( @ToolParam("Текущие метрики heap") String heapMetrics, @ToolParam("API‑ключ") String apiKey ) { var user = apiKeyComponent.decryptApiKey(apiKey); String prompt = "Дай советы по GC конфигурации на основе: " + heapMetrics; return aiClient.generateText(prompt).getText(); }
Без изменения сети или инфраструктуры сервер сразу начнёт обрабатывать эту новую команду.
3.5. Проблема разрыва соединений и решение heartbeat
При использовании SSE-клиенты иногда теряли соединение: в Java SDK для MCP отсутствовала поддержка механизма ping–pong, из-за чего промежуточные балансировщики (NGINX, облачные LB) закрывали «холостые» соединения после таймаута.
Чтобы поддерживать активность сессий, мы добавили в провайдер SSE-соединений метод sendHeartbeat(), который посылает «ping» всем подключённым клиентам без ожидания ответа:
/** * Sends a heartbeat (ping) to all connected clients to keep connections alive. This * method sends ping notifications to all active sessions without expecting a * response, which helps prevent connection timeouts. * @return A Mono that completes when heartbeat has been sent to all sessions */ public Mono<Void> sendHeartbeat() { if (sessions.isEmpty()) { logger.debug("No active sessions to send heartbeat to"); return Mono.empty(); } logger.debug("Sending heartbeat to {} active sessions", sessions.size()); return Flux.fromIterable(sessions.values()) .flatMap(session -> session.sendNotification(McpSchema.METHOD_PING, null) .doOnSuccess(v -> logger.trace("Heartbeat sent successfully to session {}", session.getId())) .doOnError(e -> logger.warn("Heartbeat failed for session {}: {}", session.getId(), e.getMessage())) .onErrorComplete()) // Continue with other sessions even if one fails .then(); }
И регистрируем периодическую отправку:
@Bean public Disposable heartbeatLoop(WebFluxSseServerTransportProvider provider) { return Flux.interval(Duration.ofSeconds(40)) .flatMap(t -> provider.sendHeartbeat()) .subscribe(); }
Итоги
-
Декларативность. Никаких ручных WebClient, JSON‑парсинга и сложных конфигураций.
-
Реактивность. Автоматический backpressure, независимые потоки, высокая пропускная способность.
-
Расширяемость. Новые команды подключаются одной аннотацией.
-
Интеграция AI. Spring AI из коробки работает с LLM — внутренняя логика клиента спрятана за абстракцией.
Наш MCP‑сервер на Spring AI стал ядром системы маршрутизации и инструментов, заменив десятки скриптов и микросервисов. Теперь телеметрия обрабатывается быстро, надёжно и просто расширяется — кроме Python, мы успешно дополнили стек сильным Java‑решением.
ссылка на оригинал статьи https://habr.com/ru/articles/923056/
Добавить комментарий