Исходный код, разобранный в статье, опубликован в этом репозитории
В интернете существует множество примеров, которые позволяют подключить ChatGPT 3.5 без инструментов к телеграм боту. Однако, когда речь заходит о большом количестве пользователей, не существуют примеров распределения нагрузки по нескольким процессам: все туториалы в интернете запускают монолит с одной репликой

Так же, на практике работы с NodeJS, я сталкивался с проблемой, когда много Promise в статусе pending замедляют работу приложения нагрузкой на сборщик мусора. Добавив сторонние инструменты (дав ChatGPT вызывать внешнии функции с запросами к базе данных) потребуется думать как минимум о создании реплик монолита, чтобы раздувания очереди ожидания данных из базы
Архитектура приложения
Для балансировки нагрузки самым очевидным инструментом является Nginx upstream — инструмент получает WebSocket соединение нового клиента на порт 80 и в порядке очереди проксирует его на 8081, 8082, …, 8085 зависимо от количества реплик. Если клиент не проявляет активность 15 минут, соединение прирывается, если будет новое сообщение, оно создастся заного.

Реплики сохраняют историю переписки в Redis, что позволяет воссоздать контекст не смотря на то, что сообщения обрабатывает новый процесс. Создавать реплики будет PM2 — наиболее нативный способ для приложений стека NodeJS

Так же, используя PM2, за 40$ в месяц можно купить готовое уведомление о инцидентах по Slack / Email, мониторинг сервера по стандартным метрикам таким как CPUs: cores, hardware threads, virtual threads, Memory: capacity, Network interfaces, Storage devices: I/O, capacity, время отклика и тд. Это экономит деньги на Dev Ops до самоокупаемости проекта.
Файлы конфигурации
Чтобы не специализировать Linux на машине разработчика под один проект, обернем Nginx в Docker. Для этого, напишем docker-compose.yaml
version: '3.8' services: nginx: image: nginx:1.27.4 ports: - "80:80" extra_hosts: - "host.docker.internal:host-gateway" volumes: - ./config/nginx.conf:/etc/nginx/nginx.conf:ro - ./logs/nginx:/var/log/nginx
И создадим сопутствующий ./config/nginx.config
с перечислением реплик
user nginx; worker_processes auto; error_log /var/log/nginx/error.log warn; pid /var/run/nginx.pid; events { worker_connections 1024; use epoll; multi_accept on; } http { include /etc/nginx/mime.types; default_type application/octet-stream; log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; access_log /var/log/nginx/access.log main; upstream local_websocket_servers { server host.docker.internal:8081; # Using host.docker.internal from hosts shared from host machine server host.docker.internal:8082; server host.docker.internal:8083; server host.docker.internal:8084; server host.docker.internal:8085; least_conn; } map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 80; server_name localhost; location / { proxy_pass http://local_websocket_servers$is_args$args; # WebSocket-specific headers proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; # Preserve original headers and connection details proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # Close upstream if client disconnects proxy_ignore_client_abort on; # Long-lived connection settings proxy_read_timeout 86400s; proxy_send_timeout 86400s; # Buffer and performance settings proxy_buffer_size 128k; proxy_buffers 4 256k; proxy_busy_buffers_size 256k; } } }
Для запуска реплик через pm2 создадим pm2.config.cjs
. По ссылке лежит package.json с скриптами для запуска проекта через npm start
const path = require("path"); const os = require("os"); const dotenv = require('dotenv'); const readConfig = () => dotenv.parse("./.env"); const getPath = (unixPath) => { return path.resolve(unixPath.replace('~', os.homedir())); }; const createBun = (index) => ({ name: `bun-ws-${index}`, script: "./src/server.ts", interpreter: getPath("~/.bun/bin/bun"), args: ["--server", `--port=808${index}`], out_file: `./logs/pm2/bun-ws-${index}-out.log`, error_file: `./logs/pm2/bun-ws-${index}-error.log`, log_date_format: "YYYY-MM-DD HH:mm:ss", merge_logs: true, env: readConfig(), }); module.exports = { apps: [ /* { name: "bun-ws-1", script: "./src/server.ts", interpreter: getPath("~/.bun/bin/bun"), args: ["--server", "--port=8081"], out_file: "./logs/pm2/bun-ws-1-out.log", error_file: "./logs/pm2/bun-ws-1-error.log", log_date_format: "YYYY-MM-DD HH:mm:ss", merge_logs: true, }, */ createBun(1), createBun(2), createBun(3), createBun(4), createBun(5), ] }
Как видно, номер порта для оркестрации мы передаем через аргументы командной строки. Наиболее удобно, так как файл .env
остается статичным без изменения в рантайме. Для запуска проекта мы испольуем Bun — ускоренный аналог NodeJS по скорости сопоставимый с Golang
Рой агентов
Агент — аналог сцены в telegraf, модель LLM, используящая изолированный system prompt. Текущий агент в Swarm может быть изменен через вызов функции changeToAgent
— аналог навигации по сценам в телеграм боте после клика по кнопке
import { Adapter, addAgent, addCompletion, addSwarm } from "agent-swarm-kit"; import { OpenAI } from "openai"; export const OPENAI_COMPLETION = addCompletion({ completionName: "openai_completion", getCompletion: Adapter.fromOpenAI(new OpenAI({ apiKey: process.env.OPENAI_API_KEY })) }); export const TEST_AGENT = addAgent({ docDescription: "This agent operates within the nginx-balancer-chat project as a test agent, utilizing the OpenaiCompletion to inform users about the actual server port of one of 5 chat instances running on different ports and upstreamed by Nginx to port 80, extracting the port details from the chat history’s system message.", agentName: "test_agent", completion: OPENAI_COMPLETION, prompt: `You are a test agent for Nginx Upstream. Tell user the server port from the chat history (system message)`, dependsOn: [], }); export const TEST_SWARM = addSwarm({ docDescription: "This swarm serves as the core structure for the nginx-balancer-chat project, managing a single TestAgent as both the sole member and default agent to handle user interactions, leveraging the CohereCompletion to report the specific port of one of 5 upstreamed chat instances balanced by Nginx to port 80.", swarmName: "test_swarm", agentList: [TEST_AGENT], defaultAgent: TEST_AGENT, });
Код этого примера запрограммирован так, чтобы LLM модель назвала, с какого порта был проксирован запрос WebSocket. Вместо OPENAI_COMPLETION
для каждого агента по отдельности можно использовать LMStudio
, Ollama
, Cohere
, более подробнее в репо
import { Chat, getAgentName, Schema, History } from "agent-swarm-kit"; import type { ServerWebSocket } from "bun"; import { parseArgs } from "util"; import { TEST_SWARM } from "./lib/swarm"; declare function parseInt(value: unknown): number; type WebSocketData = { clientId: string; }; const { values } = parseArgs({ args: process.argv, options: { server: { type: "boolean", }, port: { type: "string", }, }, strict: true, allowPositionals: true, }); History.useHistoryCallbacks({ getSystemPrompt: () => [ `The server port is ${SERVER_PORT}. Tell him that port ASAP` ] }); const SERVER_PORT = parseInt(values.port); if (isNaN(SERVER_PORT)) { throw new Error(`Server port is not a number: ${values.port}`); } if (values.server) { Bun.serve({ fetch(req, server) { const clientId = new URL(req.url).searchParams.get("clientId")!; if (!clientId) { return new Response("Invalid clientId", { status: 500 }); } console.log(`Connected clientId=${clientId} port=${SERVER_PORT}`); server.upgrade<WebSocketData>(req, { data: { clientId, }, }); }, websocket: { async open(ws: ServerWebSocket<WebSocketData>) { await Chat.beginChat(ws.data.clientId, TEST_SWARM); await Schema.writeSessionMemory(ws.data.clientId, { port: SERVER_PORT }); }, async message(ws: ServerWebSocket<WebSocketData>, message: string) { const answer = await Chat.sendMessage(ws.data.clientId, message, TEST_SWARM); ws.send( JSON.stringify({ data: answer, agentName: await getAgentName(ws.data.clientId), }) ); }, async close(ws: ServerWebSocket<WebSocketData>) { console.log(`Disconnected clientId=${ws.data.clientId} port=${SERVER_PORT}`); await Chat.dispose(ws.data.clientId, TEST_SWARM); }, }, port: SERVER_PORT, }); } console.log(`Server listening http://localhost:${SERVER_PORT}`)
Если интересует разработка чатов с более чем одним агентом, посмотрите документация
Спасибо за внимание!
ссылка на оригинал статьи https://habr.com/ru/articles/896222/
Добавить комментарий