Меня зовут Соболев Андрей и сегодня мы с вами напишем решение для фреймворка Starlette, которое позволит группировать открытые вебсокет соединения.
Вступление
Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно. В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.
Для чего нужна группировка вебсокет cоединений?
Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.
Когда я захожу в комнату с ID=1, мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).
Когда UnnamedUser заходит в комнату с ID=1, его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).
* далее по тексту вебсокет соединение будет называться каналом
Технически «канал 1» и «канал 2» это два разных объекта класса WebSocketEndpoint, поэтому в чате мы видим только свои сообщения, а не сообщения других участников (как ожидалось).
Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).
Где хранить группы?
Для хранения групп воспользуемся обычным словарем, который назовем CHANNEL_GROUPS и объявим глобально:
import time import uuid from simple_print.functions import sprint_f from starlette.endpoints import WebSocketEndpoint CHANNEL_GROUPS = {}
Унаследуемся от WebSocketEndpoint
Чтобы добавить для каждого канала связь с CHANNEL_GROUPS нам необходимо унаследоваться от базового класса WebSocketEndpoint.
Начнем с создания вспомогательного класса Channel:
class Channel: def __init__(self, websocket, expires, encoding): self.channel_uuid = str(uuid.uuid1()) # uid self.websocket = websocket self.expires = expires # срок жизни self.encoding = encoding # тип канала (json, text, bytes) self.created = time.time() # время создания async def _send(self, payload): # приватный метод для отправки в группу websocket = self.websocket if self.encoding == "json": await websocket.send_json(payload) elif self.encoding == "text": await websocket.send_text(payload) elif self.encoding == "bytes": await websocket.send_bytes(payload) else: await websocket.send(payload) self.created = time.time() def _is_expired(self): return self.expires + int(self.created) < time.time() def __repr__(self): return f"{self.channel_uuid}"
В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации UUID objects
Создадим основной класс ChannelEndpoint от которого мы будем наследоваться в наших endpoints:
class ChannelEndpoint(WebSocketEndpoint): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.expires = 60 * 60 * 24 self.encoding = "json" self.groups = CHANNEL_GROUPS # добавляем связь с CHANNEL_GROUPS async def on_connect(self, websocket, **kwargs): await super().on_connect(websocket, **kwargs) self.channel = Channel(websocket=websocket, expires=self.expires, encoding=self.encoding) async def on_disconnect(self, websocket, close_code): await super().on_disconnect(websocket, close_code) await self._remove(self.channel) async def _remove(self, channel): for group in self.groups: if channel in self.groups[group]: del self.groups[group][channel] async def _validate_name(self, name): if name.isidentifier(): return True raise TypeError("Group names must be valid python identifier only alphanumerics and underscores are accepted") async def _clean_expired(self): for group in self.groups: for channel in self.groups.get(group, {}): if channel._is_expired(): del self.groups[group][channel] async def get_or_create(self, group): # получаем или добавляем группу assert await self._validate_name(group), "Invalid group name" self.groups.setdefault(group, {}) self.groups[group][self.channel] = "" self.group = group async def group_send(self, payload): # отправляем сообщение в группу await self._clean_expired() for channel in self.groups.get(self.group, {}): await channel._send(payload)
При инициализации мы переопределяем базовый метод WebSocketEndpoint on_connect, добавляя к нему объект Channel.
Когда пользователь покидает канал, вызывается приватный метод _remove, для удаления объекта канала из CHANNEL_GROUPS.
В endpoints унаследованных от ChannelEndpoint появляются новые публичные методы:
- get_or_create(self, group) — для получения или создания группы
- group_send(self, payload) — для отправки сообщений в каналы, которые входят в данную группу.
Пример интеграции
routes = [ Route("/chat/", endpoint=ChatView) Route("/chat/ws", endpoint=ChatChannel) ] html = """ <!DOCTYPE html> <html> <head> <title>ws</title> </head> <body> <h1>ChannelEndpoint</h1> <form action="" onsubmit="sendMessage(event)"> <label>group_id: </label><input type="text" id="groupId" autocomplete="off" value="2"><br/> <label>username: </label><input type="text" id="username" autocomplete="off" value="test_user2"><br/> <label>message: </label><input type="text" id="messageText" autocomplete="off" value="test_message2"><br/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = new WebSocket("ws://localhost/chat/chat/ws"); ws.onmessage = function(event) { console.log('Message received %s', event.data) var messages = document.getElementById('messages'); var message = document.createElement('li'); var data = JSON.parse(event.data); message.innerHTML = `<strong>${data.username} :</strong> ${data.message}`; messages.appendChild(message); }; function sendMessage(event) { var username = document.getElementById("username"); var group_id = document.getElementById("groupId"); var input = document.getElementById("messageText"); var data = { "group_id": group_id.value, "username": username.value, "message": input.value, }; console.log('Message send %s', data) ws.send(JSON.stringify(data)); event.preventDefault(); } </script> </body> </html> """ class ChatView(HTTPEndpoint): async def get(self, request): return HTMLResponse(html) class ChatChannel(ChannelEndpoint): # наследуемся от ChannelEndpoint async def on_receive(self, websocket, data): group_id = data["group_id"] message = data["message"] username = data["username"] if message.strip(): group = f"group_{group_id}" await self.get_or_create(group) # получаем группу (и все ее каналы) из словаря CHANNEL_GROUPS payload = { "username": username, "message": message, } await self.group_send(payload) # отправляем сообщение всем участникам группы
Добавим полезные функции
В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».
Отправка сообщения в группу из любого места кода:
from channel_box import group_send await group_send('my_chat_1', {"username": "New User", "message": "Hello world"})
«Мониторинг» групп:
from channel_box import groups_show groups_show()
Очистка:
from channel_box import groups_flush groups_flush()
Установка
Менеджер пакетов pip:
pip install channel-box
Исходный код решения:
github.com/Sobolev5/channel-box
Пример работы:
backend.starlette-vue.site/chat/chat1
backend.starlette-vue.site/chat/chat2
Исходный код примера работы:
github.com/Sobolev5/starlette-vue-backend/tree/master/apps/chat
ссылка на оригинал статьи https://habr.com/ru/post/506056/
Добавить комментарий