Балансировка нагрузки LLM через Nginx

от автора

Исходный код, разобранный в статье, опубликован в этом репозитории

В интернете существует множество примеров, которые позволяют подключить ChatGPT 3.5 без инструментов к телеграм боту. Однако, когда речь заходит о большом количестве пользователей, не существуют примеров распределения нагрузки по нескольким процессам: все туториалы в интернете запускают монолит с одной репликой

https://github.com/telegraf/telegraf/issues/423

Так же, на практике работы с NodeJS, я сталкивался с проблемой, когда много Promise в статусе pending замедляют работу приложения нагрузкой на сборщик мусора. Добавив сторонние инструменты (дав ChatGPT вызывать внешнии функции с запросами к базе данных) потребуется думать как минимум о создании реплик монолита, чтобы раздувания очереди ожидания данных из базы

Архитектура приложения

Для балансировки нагрузки самым очевидным инструментом является Nginx upstream — инструмент получает WebSocket соединение нового клиента на порт 80 и в порядке очереди проксирует его на 8081, 8082, …, 8085 зависимо от количества реплик. Если клиент не проявляет активность 15 минут, соединение прирывается, если будет новое сообщение, оно создастся заного.

https://nginx.org/en/docs/http/ngx_http_upstream_module.html

Реплики сохраняют историю переписки в Redis, что позволяет воссоздать контекст не смотря на то, что сообщения обрабатывает новый процесс. Создавать реплики будет PM2 — наиболее нативный способ для приложений стека NodeJS

https://pm2.io/docs/plus/overview/

Так же, используя PM2, за 40$ в месяц можно купить готовое уведомление о инцидентах по Slack / Email, мониторинг сервера по стандартным метрикам таким как CPUs: cores, hardware threads, virtual threads, Memory: capacity, Network interfaces, Storage devices: I/O, capacity, время отклика и тд. Это экономит деньги на Dev Ops до самоокупаемости проекта.

Файлы конфигурации

Чтобы не специализировать Linux на машине разработчика под один проект, обернем Nginx в Docker. Для этого, напишем docker-compose.yaml

version: '3.8'  services:   nginx:     image: nginx:1.27.4     ports:       - "80:80"     extra_hosts:       - "host.docker.internal:host-gateway"     volumes:       - ./config/nginx.conf:/etc/nginx/nginx.conf:ro       - ./logs/nginx:/var/log/nginx 

И создадим сопутствующий ./config/nginx.config с перечислением реплик

user nginx; worker_processes auto;  error_log /var/log/nginx/error.log warn; pid /var/run/nginx.pid;  events {     worker_connections 1024;     use epoll;     multi_accept on; }  http {     include /etc/nginx/mime.types;     default_type application/octet-stream;      log_format main '$remote_addr - $remote_user [$time_local] "$request" '                     '$status $body_bytes_sent "$http_referer" '                     '"$http_user_agent" "$http_x_forwarded_for"';      access_log /var/log/nginx/access.log main;      upstream local_websocket_servers {         server host.docker.internal:8081;  # Using host.docker.internal from hosts shared from host machine         server host.docker.internal:8082;         server host.docker.internal:8083;         server host.docker.internal:8084;         server host.docker.internal:8085;         least_conn;     }      map $http_upgrade $connection_upgrade {         default upgrade;         ''      close;     }      server {         listen 80;         server_name localhost;          location / {             proxy_pass http://local_websocket_servers$is_args$args;                          # WebSocket-specific headers             proxy_http_version 1.1;             proxy_set_header Upgrade $http_upgrade;             proxy_set_header Connection $connection_upgrade;                          # Preserve original headers and connection details             proxy_set_header Host $host;             proxy_set_header X-Real-IP $remote_addr;             proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;             proxy_set_header X-Forwarded-Proto $scheme;              # Close upstream if client disconnects             proxy_ignore_client_abort on;                          # Long-lived connection settings             proxy_read_timeout 86400s;             proxy_send_timeout 86400s;                          # Buffer and performance settings             proxy_buffer_size 128k;             proxy_buffers 4 256k;             proxy_busy_buffers_size 256k;         }     } } 

Для запуска реплик через pm2 создадим pm2.config.cjs. По ссылке лежит package.json с скриптами для запуска проекта через npm start

const path = require("path"); const os = require("os"); const dotenv = require('dotenv');  const readConfig = () => dotenv.parse("./.env");  const getPath = (unixPath) => {   return path.resolve(unixPath.replace('~', os.homedir())); };  const createBun = (index) => ({   name: `bun-ws-${index}`,   script: "./src/server.ts",   interpreter: getPath("~/.bun/bin/bun"),   args: ["--server", `--port=808${index}`],   out_file: `./logs/pm2/bun-ws-${index}-out.log`,   error_file: `./logs/pm2/bun-ws-${index}-error.log`,   log_date_format: "YYYY-MM-DD HH:mm:ss",   merge_logs: true,   env: readConfig(), });  module.exports = {   apps: [     /*     {       name: "bun-ws-1",       script: "./src/server.ts",       interpreter: getPath("~/.bun/bin/bun"),       args: ["--server", "--port=8081"],       out_file: "./logs/pm2/bun-ws-1-out.log",       error_file: "./logs/pm2/bun-ws-1-error.log",       log_date_format: "YYYY-MM-DD HH:mm:ss",       merge_logs: true,     },     */     createBun(1),     createBun(2),     createBun(3),     createBun(4),     createBun(5),   ] } 

Как видно, номер порта для оркестрации мы передаем через аргументы командной строки. Наиболее удобно, так как файл .env остается статичным без изменения в рантайме. Для запуска проекта мы испольуем Bun — ускоренный аналог NodeJS по скорости сопоставимый с Golang

Рой агентов

Агент — аналог сцены в telegraf, модель LLM, используящая изолированный system prompt. Текущий агент в Swarm может быть изменен через вызов функции changeToAgent — аналог навигации по сценам в телеграм боте после клика по кнопке

import { Adapter, addAgent, addCompletion, addSwarm } from "agent-swarm-kit"; import { OpenAI } from "openai";  export const OPENAI_COMPLETION = addCompletion({   completionName: "openai_completion",   getCompletion: Adapter.fromOpenAI(new OpenAI({ apiKey: process.env.OPENAI_API_KEY })) });  export const TEST_AGENT = addAgent({   docDescription: "This agent operates within the nginx-balancer-chat project as a test agent, utilizing the OpenaiCompletion to inform users about the actual server port of one of 5 chat instances running on different ports and upstreamed by Nginx to port 80, extracting the port details from the chat history’s system message.",   agentName: "test_agent",   completion: OPENAI_COMPLETION,   prompt: `You are a test agent for Nginx Upstream. Tell user the server port from the chat history (system message)`,   dependsOn: [], });  export const TEST_SWARM = addSwarm({   docDescription: "This swarm serves as the core structure for the nginx-balancer-chat project, managing a single TestAgent as both the sole member and default agent to handle user interactions, leveraging the CohereCompletion to report the specific port of one of 5 upstreamed chat instances balanced by Nginx to port 80.",   swarmName: "test_swarm",   agentList: [TEST_AGENT],   defaultAgent: TEST_AGENT, }); 

Код этого примера запрограммирован так, чтобы LLM модель назвала, с какого порта был проксирован запрос WebSocket. Вместо OPENAI_COMPLETION для каждого агента по отдельности можно использовать LMStudio, Ollama, Cohere, более подробнее в репо

import { Chat, getAgentName, Schema, History } from "agent-swarm-kit"; import type { ServerWebSocket } from "bun"; import { parseArgs } from "util"; import { TEST_SWARM } from "./lib/swarm";  declare function parseInt(value: unknown): number;  type WebSocketData = {   clientId: string; };  const { values } = parseArgs({   args: process.argv,   options: {     server: {       type: "boolean",     },     port: {       type: "string",     },   },   strict: true,   allowPositionals: true, });   History.useHistoryCallbacks({   getSystemPrompt: () => [     `The server port is ${SERVER_PORT}. Tell him that port ASAP`   ] });  const SERVER_PORT = parseInt(values.port);  if (isNaN(SERVER_PORT)) {   throw new Error(`Server port is not a number: ${values.port}`); }  if (values.server) {   Bun.serve({     fetch(req, server) {       const clientId = new URL(req.url).searchParams.get("clientId")!;       if (!clientId) {         return new Response("Invalid clientId", { status: 500 });       }       console.log(`Connected clientId=${clientId} port=${SERVER_PORT}`);       server.upgrade<WebSocketData>(req, {         data: {           clientId,         },       });     },     websocket: {       async open(ws: ServerWebSocket<WebSocketData>) {         await Chat.beginChat(ws.data.clientId, TEST_SWARM);         await Schema.writeSessionMemory(ws.data.clientId, { port: SERVER_PORT });       },       async message(ws: ServerWebSocket<WebSocketData>, message: string) {         const answer = await Chat.sendMessage(ws.data.clientId, message, TEST_SWARM);         ws.send(           JSON.stringify({             data: answer,             agentName: await getAgentName(ws.data.clientId),           })         );       },       async close(ws: ServerWebSocket<WebSocketData>) {         console.log(`Disconnected clientId=${ws.data.clientId} port=${SERVER_PORT}`);         await Chat.dispose(ws.data.clientId, TEST_SWARM);       },     },     port: SERVER_PORT,   }); }  console.log(`Server listening http://localhost:${SERVER_PORT}`) 

Если интересует разработка чатов с более чем одним агентом, посмотрите документация

Спасибо за внимание!


ссылка на оригинал статьи https://habr.com/ru/articles/896222/