Исходный код, разобранный в статье, опубликован в этом репозитории
Частое явление в вебе — полная перезагрузка приложения при переходе между страницами. При этом соединение WebSocket разрывается, а новая подписка устанавливается только после начала выполнения загруженного JavaScript, что занимает как минимум секунду. Во время перезагрузки страницы сообщения, отправленные через WebSocket, не будут получены клиентской частью приложения.
Аналогичная проблема возникает и в микросервисной архитектуре при использовании gRPC-стримов для взаимодействия между микросервисами. Когда микросервис отключается, соединение теряется, и попытки доставки сообщений приводят к исключениям. Чтобы решить эти проблемы, был создан starter kit, который использует Redis в качестве очереди доставки сообщений получателю.
Код веб-сервера
Очередь событий была протестирована на тысяче сессий суммарно по 35 сессий одновременно. Интеграционный тест приложен в репозитории, при наличие мощной машины, на которой установлено более 16Гб оперативной памяти, количество одновременных сессий можно увеличить в настройках
Следующий код создает вебсокет-сервер, осуществляющий вещание сообщений, полученных из микросервисной архитектуры через grpc.streamService.makeClient
import { WSContext } from "hono/ws"; import { grpc } from "@modules/remote-grpc"; import { BroadcastRedis } from "@modules/remote-redis"; import { singleshot } from "functools-kit"; import { app, upgradeWebSocket } from "src/config/app"; const connectionManager = new BroadcastRedis("host-ws__redis-emit"); app.get("/api/v1/realtime/ws", upgradeWebSocket(() => { let isClosed = false; const makeConnection = singleshot((sessionId: string, ws: WSContext) => { connectionManager.listenEvent(sessionId, async (data) => { if (isClosed) { return false; } ws.send(JSON.stringify(data)); return true; }); connectionManager.listenDisconnect(sessionId, () => { if (!isClosed) { ws.close(); } }); }); return { onMessage(event, ws) { const { sessionId } = JSON.parse(event.data.toString()); makeConnection(sessionId, ws); }, onClose: () => { isClosed = true; }, } })); grpc.streamService.makeClient<{ side: string, value: string }>("MessageService", async (message) => { connectionManager.emit(message.data); }); export default app;
При получении первого сообщения, вебсокет считывает sessionId
, который со стороны браузера потребуется сохранить в sessionStorage
так, чтобы он не менялся при обновлении страницы. Отправка сообщения клиенту осуществляется через connectionManager
, тот самый буфер, упомянутый в первом абзаце статьи. Коннект создается ровно один раз с использованием функции высшего порядка singleshot
Код буфера WebSocket
Для тестирования приложения был создан echo сервис, каждую секунду отправляющий число секунд с 00:01. Когда веб-сервер отключается, сообщения пишутся в Redis, при повторном подключении, если оно было осуществлено в течение минуты, все сообщения из очереди отправляются в порядке поступления. Новые сообщения встают в конец очереди, сохраняя очередность
Следующий код осуществляет создание ориентированных словарей в Redis, которые осуществляют буферизацию событий так, чтобы клиент мог переподключиться в течение минуты и получить все пропущенные сообщения
import { singleton } from "di-singleton"; import { IPubsubWrappedFn, pubsub, Subject, memoize, randomString, } from "functools-kit"; import BaseMap from "./BaseMap"; import { inject } from "src/core/di"; import RedisService from "src/services/base/RedisService"; import TYPES from "src/config/types"; type MessageListener<Data = any> = (data: Data) => Promise<boolean>; type ConnectionEmit = keyof { "host-sse__redis-emit": never; "host-ws__redis-emit": never; }; const TTL_ONLINE_SECONDS = 1 * 60; export const BroadcastRedis = singleton( class { readonly redisService = inject<RedisService>(TYPES.redisService); _disconnectSubject = new Subject<string>(); _listenerMap = new Map<string, IPubsubWrappedFn>(); _emitMap = new Map<string, MessageListener>(); getEmitQueue = memoize( ([id]) => id, (id: string) => new class extends BaseMap(`${this.connectionEmitId}__${id}`) { } ); constructor(readonly connectionEmitId: ConnectionEmit) {} listenEvent = async <Data = any>( id: string, emit: MessageListener<Data> ) => { const queue = this.getEmitQueue(id); this._emitMap.set(id, emit); if (!this._listenerMap.has(id)) this._listenerMap.set( id, pubsub<Data>( async (data) => { const emit = this._emitMap.get(id); if (emit) { return await emit(data); } return false; }, { onDestroy: async () => { this._listenerMap.delete(id); this._emitMap.delete(id); this._disconnectSubject.next(id); await queue.clear(); this.getEmitQueue.clear(id); }, queue: pubsub.fromMap(queue), } ) ); }; listenDisconnect = (id: string, fn: () => void) => { this._disconnectSubject.filter((channelId) => channelId === id).once(fn); }; _getTotalListeners = async () => { const redis = await this.redisService.getRedis(); const pattern = `${this.connectionEmitId}__*`; const keys = await redis.keys(pattern); const orderKeys = keys.filter((key) => key.endsWith(":online")); const start = `${this.connectionEmitId}__`; const end = ':online'; return [...new Set(orderKeys)].map((key) => key.slice(start.length, key.indexOf(end))); }; _pushOnlineListener = async (id: string) => { const redis = await this.redisService.getRedis(); await redis.setex(`${this.connectionEmitId}__${id}:online`, TTL_ONLINE_SECONDS, 'online'); }; emit = async <Data = any>(data: Data) => { for (const id of await this._getTotalListeners()) { if (this._listenerMap.has(id)) { continue; } const queue = this.getEmitQueue(id); await queue.setWithKeepExpire(randomString(), data); } for (const [id, fn] of this._listenerMap.entries()) { await this._pushOnlineListener(id); fn(data); } }; } ); export type TBroadcastRedis = InstanceType<typeof BroadcastRedis>; export default BroadcastRedis;
Каждое соединения клиента с websocket представляет собой инстанцию анонимного класса, наследующую динамический класс фабрики BaseMap
. Инстанции кешируются функцией высшего порядка memoize
и создаются в runtime после первого обращения, сохраняя одну ссылку для одинаковых sessionId
. Класс BroadcastRedis обернут в di-singleton
, для каждого connectionEmitId
всегда возвращается своя уникальная ссылка.
Код буфера gRPC
Дополнительно, был написан альтернативный канал связи, использующий Server-Sent Events (SSE) + HTTP/2. В отличие от WebSocket не требует отправки heartbeat пакетов, что удобно на Python, где основной поток может быть заморожен, что приведет к переподключениям WebSocket
Буфер gRPC
используется в хост приложении и позволяет повторить запрос несколько раз, если отправитель или получатель не запущены. Так же, если клиент или сервер упадут, при следующем запуске очередь сообщений восстановится автоматически
import { inject } from "src/core/di"; import type ProtoService from "./ProtoService"; import type LoggerService from "./LoggerService"; import TYPES from "src/config/types"; import { CC_GRPC_MAP } from "src/config/params"; import get from "src/utils/get"; import * as grpc from "@grpc/grpc-js"; import { errorData, Subject, createAwaiter, queued, CANCELED_PROMISE_SYMBOL, singleshot, sleep, singlerun, IPubsubArray, PubsubArrayAdapter, randomString, } from "functools-kit"; type Ctor = new (...args: any[]) => grpc.Client; type ServiceName = keyof typeof CC_GRPC_MAP; const CHANNEL_OK_SYMBOL = Symbol("channel-ok"); const CHANNEL_ERROR_SYMBOL = Symbol("channel-error"); const CHANNEL_RECONNECT_SYMBOL = Symbol("channel-reconnect"); interface IMessage<Data = object> { serviceName: string; clientId: string; userId: string; requestId: string; stamp: string; data: Data; } export type SendMessageFn<T = object> = ( outgoing: IMessage<T> ) => Promise<void | typeof CANCELED_PROMISE_SYMBOL>; const GRPC_READY_DELAY = 60_000; const GRPC_MAX_RETRY = 15; export interface IMakeClientConfig<T = object> { queue: IPubsubArray<[string, IMessage<T>]>; } export interface IMakeServerConfig<T = object> { queue: IPubsubArray<[string, IMessage<T>]>; } interface IAwaiter { resolve(): void; } export class StreamService { private readonly protoService = inject<ProtoService>(TYPES.protoService); private readonly loggerService = inject<LoggerService>(TYPES.loggerService); _serverRef = new Map<string, grpc.Server>(); _makeServerInternal = <T = object>( serviceName: ServiceName, connector: (incoming: IMessage<T>) => Promise<void>, reconnect: (error: boolean) => void, attempt: number ): SendMessageFn<any> => { this.loggerService.log( `remote-grpc streamService _makeServerInternal connecting service=${serviceName} attempt=${attempt}` ); const messageSubject = new Subject<IMessage>(); // Много строк, базовая реализация взаимодействия с сервером gRPC return async (outgoing: IMessage) => { await messageSubject.waitForListener(); await messageSubject.next(outgoing); }; }; makeServer = <T = object>( serviceName: ServiceName, connector: (incoming: IMessage<T>) => Promise<void>, { queue = new PubsubArrayAdapter() }: Partial<IMakeServerConfig> = {} ): SendMessageFn<any> => { this.loggerService.log( `remote-grpc streamService makeServer connecting service=${serviceName}` ); const reconnectSubject = new Subject<typeof CHANNEL_RECONNECT_SYMBOL>(); const connectorFn = queued(connector) as typeof connector; const awaiterMap = new Map<string, IAwaiter>(); let attempt = 0; let outgoingFnRef: SendMessageFn<any>; const makeBroadcast = singlerun(async () => { while (await queue.length()) { let isOk = true; try { const first = await queue.getFirst(); if (!first) { break; } const [id, outgoingMsg] = first; const awaiter = awaiterMap.get(id); if (!awaiter) { this.loggerService.log( "remote-grpc streamService makeServer missing awaiter", { id, outgoingMsg } ); continue; } const status = await Promise.race([ outgoingFnRef(outgoingMsg), reconnectSubject.toPromise(), ]); if (status === CHANNEL_RECONNECT_SYMBOL) { this.loggerService.log( `remote-grpc streamService makeServer reconnect service=${serviceName}` ); throw CHANNEL_ERROR_SYMBOL; } await awaiter.resolve(); } catch { isOk = false; } finally { if (isOk) { await queue.shift(); } await sleep(10); } } }); { const makeConnection = () => { attempt += 1; reconnectSubject.next(CHANNEL_RECONNECT_SYMBOL); outgoingFnRef = this._makeServerInternal<T>( serviceName, connectorFn, singleshot(async () => { if (attempt >= GRPC_MAX_RETRY) { await queue.clear(); throw new Error( `remote-grpc streamService makeServer max retry reached service=${serviceName}` ); } makeConnection(); }), attempt ); }; makeConnection(); } const makeCommit = async (outgoing: IMessage) => { const [result, awaiter] = createAwaiter<void>(); const id = randomString(); awaiterMap.set(id, awaiter); await queue.push([id, outgoing]); await makeBroadcast(); return await result; }; const makeInit = singleshot(async () => { const resolveList: Promise<void>[] = []; for await (const [id] of queue) { const [resolve, awaiter] = createAwaiter<void>(); awaiterMap.set(id, awaiter); resolveList.push(resolve); } await makeBroadcast(); await Promise.all(resolveList); }); return async (outgoing: IMessage) => { await makeInit(); await makeCommit(outgoing); attempt = 0; }; }; // Код взаимодействия клиента организован аналогично } export default StreamService;
Дополнительно
Если необходимо организовать InMemory
очередь без использования Redis
, есть адаптер
import { singleton } from "di-singleton"; import { IPubsubWrappedFn, pubsub, Subject } from "functools-kit"; type MessageListener<Data = any> = (data: Data) => Promise<boolean>; export const BroadcastMemory = singleton( class { _disconnectSubject = new Subject<string>(); _listenerMap = new Map<string, IPubsubWrappedFn>(); _emitMap = new Map<string, MessageListener>(); constructor(readonly connectionPoolId: string) { } listenEvent = async <Data = any>( id: string, emit: MessageListener<Data> ) => { this._emitMap.set(id, emit); if (!this._listenerMap.has(id)) this._listenerMap.set( id, pubsub<Data>( async (data) => { const emit = this._emitMap.get(id); if (emit) { return await emit(data); } return false; }, { onDestroy: () => { this._listenerMap.delete(id); this._emitMap.delete(id); this._disconnectSubject.next(id); }, } ) ); }; listenDisconnect = (id: string, fn: () => void) => { this._disconnectSubject.filter((channelId) => channelId === id).once(fn); }; emit = <Data = any>(data: Data) => { for (const fn of this._listenerMap.values()) { fn(data); } }; } ); export type TBroadcastMemory = InstanceType<typeof BroadcastMemory>; export default BroadcastMemory;
Спасибо за внимание!
ссылка на оригинал статьи https://habr.com/ru/articles/866130/
Добавить комментарий