Группировка вебсокет соединений для асинхронного фреймворка Starlette

от автора

Доброго дня!

Меня зовут Соболев Андрей и сегодня мы с вами напишем решение для фреймворка Starlette, которое позволит группировать открытые вебсокет соединения.

Вступление

Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно. В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.

Для чего нужна группировка вебсокет cоединений?

Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.

Когда я захожу в комнату с ID=1, мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).

Когда UnnamedUser заходит в комнату с ID=1, его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).

* далее по тексту вебсокет соединение будет называться каналом

Технически «канал 1» и «канал 2» это два разных объекта класса WebSocketEndpoint, поэтому в чате мы видим только свои сообщения, а не сообщения других участников (как ожидалось).

Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).

Где хранить группы?

Для хранения групп воспользуемся обычным словарем, который назовем CHANNEL_GROUPS и объявим глобально:

import time import uuid from simple_print.functions import sprint_f from starlette.endpoints import WebSocketEndpoint  CHANNEL_GROUPS = {} 

Унаследуемся от WebSocketEndpoint

Чтобы добавить для каждого канала связь с CHANNEL_GROUPS нам необходимо унаследоваться от базового класса WebSocketEndpoint.

Начнем с создания вспомогательного класса Channel:

class Channel:     def __init__(self, websocket, expires, encoding):         self.channel_uuid = str(uuid.uuid1()) # uid         self.websocket = websocket         self.expires = expires # срок жизни         self.encoding = encoding # тип канала (json, text, bytes)         self.created = time.time() # время создания       async def _send(self, payload): # приватный метод для отправки в группу         websocket = self.websocket         if self.encoding == "json":             await websocket.send_json(payload)         elif self.encoding == "text":             await websocket.send_text(payload)         elif self.encoding == "bytes":             await websocket.send_bytes(payload)         else:             await websocket.send(payload)         self.created = time.time()      def _is_expired(self):         return self.expires + int(self.created) < time.time()      def __repr__(self):         return f"{self.channel_uuid}" 

В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации UUID objects

Создадим основной класс ChannelEndpoint от которого мы будем наследоваться в наших endpoints:

class ChannelEndpoint(WebSocketEndpoint):     def __init__(self, *args, **kwargs):         super().__init__(*args, **kwargs)         self.expires = 60 * 60 * 24           self.encoding = "json"         self.groups = CHANNEL_GROUPS # добавляем связь с CHANNEL_GROUPS       async def on_connect(self, websocket, **kwargs):         await super().on_connect(websocket, **kwargs)         self.channel = Channel(websocket=websocket, expires=self.expires, encoding=self.encoding)      async def on_disconnect(self, websocket, close_code):         await super().on_disconnect(websocket, close_code)         await self._remove(self.channel)      async def _remove(self, channel):         for group in self.groups:             if channel in self.groups[group]:                 del self.groups[group][channel]      async def _validate_name(self, name):         if name.isidentifier():             return True         raise TypeError("Group names must be valid python identifier only alphanumerics and underscores are accepted")      async def _clean_expired(self):         for group in self.groups:             for channel in self.groups.get(group, {}):                 if channel._is_expired():                     del self.groups[group][channel]      async def get_or_create(self, group): # получаем или добавляем группу         assert await self._validate_name(group), "Invalid group name"         self.groups.setdefault(group, {})         self.groups[group][self.channel] = ""         self.group = group      async def group_send(self, payload): # отправляем сообщение в группу         await self._clean_expired()         for channel in self.groups.get(self.group, {}):             await channel._send(payload) 

При инициализации мы переопределяем базовый метод WebSocketEndpoint on_connect, добавляя к нему объект Channel.

Когда пользователь покидает канал, вызывается приватный метод _remove, для удаления объекта канала из CHANNEL_GROUPS.

В endpoints унаследованных от ChannelEndpoint появляются новые публичные методы:

  • get_or_create(self, group) — для получения или создания группы
  • group_send(self, payload) — для отправки сообщений в каналы, которые входят в данную группу.

Пример интеграции

routes = [     Route("/chat/", endpoint=ChatView)     Route("/chat/ws", endpoint=ChatChannel) ]  html = """ <!DOCTYPE html> <html>     <head>         <title>ws</title>     </head>     <body>         <h1>ChannelEndpoint</h1>         <form action="" onsubmit="sendMessage(event)">             <label>group_id: </label><input type="text" id="groupId" autocomplete="off" value="2"><br/>             <label>username: </label><input type="text" id="username" autocomplete="off" value="test_user2"><br/>                    <label>message: </label><input type="text" id="messageText" autocomplete="off" value="test_message2"><br/>             <button>Send</button>         </form>         <ul id='messages'>         </ul>         <script>             var ws = new WebSocket("ws://localhost/chat/chat/ws");             ws.onmessage = function(event) {                 console.log('Message received %s', event.data)                 var messages = document.getElementById('messages');                 var message = document.createElement('li');                 var data = JSON.parse(event.data);                 message.innerHTML = `<strong>${data.username} :</strong> ${data.message}`;                 messages.appendChild(message);             };             function sendMessage(event) {                 var username = document.getElementById("username");                 var group_id = document.getElementById("groupId");                 var input = document.getElementById("messageText");                 var data = {                     "group_id": group_id.value,                      "username": username.value,                     "message": input.value,                 };                 console.log('Message send %s', data)                 ws.send(JSON.stringify(data));                 event.preventDefault();             }         </script>     </body> </html> """  class ChatView(HTTPEndpoint):     async def get(self, request):         return HTMLResponse(html)  class ChatChannel(ChannelEndpoint): # наследуемся от ChannelEndpoint     async def on_receive(self, websocket, data):         group_id = data["group_id"]         message = data["message"]         username = data["username"]         if message.strip():             group = f"group_{group_id}"              await self.get_or_create(group)  # получаем группу (и все ее каналы) из словаря CHANNEL_GROUPS              payload = {                 "username": username,                 "message": message,             }              await self.group_send(payload) # отправляем сообщение всем участникам группы 

Добавим полезные функции

В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».

Отправка сообщения в группу из любого места кода:

from channel_box import group_send await group_send('my_chat_1', {"username": "New User", "message": "Hello world"}) 

«Мониторинг» групп:

from channel_box import groups_show groups_show() 

Очистка:

from channel_box import groups_flush groups_flush() 

Установка

Менеджер пакетов pip:

pip install channel-box 

Исходный код решения:
github.com/Sobolev5/channel-box

Пример работы:
backend.starlette-vue.site/chat/chat1
backend.starlette-vue.site/chat/chat2

Исходный код примера работы:
github.com/Sobolev5/starlette-vue-backend/tree/master/apps/chat

ссылка на оригинал статьи https://habr.com/ru/post/506056/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *