«Я играю, меня не беспокоить!». Как я стал зажигать световое табло On Air при подключении к TeamSpeak

от автора

Иногда мы с товарищами собираемся, чтобы сыграть несколько забегов в игре PUBG: Battlegrounds. Взаимодействие между участниками команды (сквада) тут возведено в абсолют, и голосовая коммуникация жизненно необходима для успешного выживания. В PUBG нельзя расслабляться ни на секунду. Частенько бывают ситуации, когда нужно на слух определять местоположение соперника. Любой посторонний звук в такой момент способен испортить матч всей команде.

Чтобы минимизировать ситуации, когда мои домочадцы пытаются со мной заговорить во время игры, я решил поставить световое табло ON AIR, похожее на то, которое используют на радиостанциях (и да, после такой наглости я даже выжил). Задумка была в том, чтобы табло автоматически зажигалось, когда я нахожусь на сервере TeamSpeak, и отключалось после дисконнекта. Что в итоге у меня получилось — читайте дальше.

Вот оно — табло

Вот оно — табло

Прежде всего я раздобыл готовое табло. Разумеется, можно было бы заморочиться с неоном, но я решил обойтись обычным LED. Цена на такую штуку не слишком велика, но вот с автоматизацией придется повозиться. Первое, что я сделал, — проверил ток потребления:

Тестер Дядюшки Ляо

Тестер Дядюшки Ляо

1,44 А — это действительно много, чтобы брать напрямую с RPI. Запитывать придется от отдельного блока. Управлять буду с помощью реле. Нормальные люди обычно покупают готовые модули, но ждать три недели с маркетплейса не хотелось. Под рукой же лежало реле TRJ-5VDC-SA-CD:

TRJ-5VDC-SA-CD

TRJ-5VDC-SA-CD

Достаточно подать на управляющие контакты 5 В 40 мА — и оно успешно сработает. Проблемы две: GPIO Raspberry Pi оперирует напряжением 3,3 V, а максимум c такого пина можно отдать ~16 мА. Тем не менее на самой гребенке два пина 5 V, которые по факту выдают до 1–2 А в зависимости от блока питания и загруженности платы. Само реле для удержания в открытом состоянии будет потреблять 40–70 мА. Получается, нужно собрать простейший драйвер.

Плюс не стоит забывать, что реле управляется катушкой, которая благодаря коварной физике накапливает в себе магнитную энергию. Достаточно резко оборвать ток — и напряжение на выводах способно подскочить и выжечь нафиг наш нежный GPIO (привет, обратная ЭДС). Так что надо будет подумать о защите в виде диода.

Делаю драйвер

Порывшись в коробочках с радиодеталями (у каждого мужчины такая со временем появляется), я нашел NPN-транзистор КТ3102 в корпусе TO-92 и сигнальный кремниевый диод 1N4148. Вначале я рассчитывал, что мне для открытия транзистора будет достаточно ~2,6 мА, поэтому ошибочно взял резистор на 1кΩ.

Дальше собрал схему:

  • +5 V (pin 2) — анод 1N4148;

  • катод 1N4148 — катушка управления;

  • катушка управления — эмиттер транзистора;

  • GPIO 17 (pin 11) — резистор;

  • резистор — база транзистора;

  • GND (pin 6) — коллектор транзистора.

Быстро спаял навесным монтажом и подключил к «малинке». Чтобы было проще, установил Node-Red при помощи волшебного скрипта:

$ bash <(curl -sL https://github.com/node-red/linux-installers/releases/latest/download/update-nodejs-and-nodered-deb)

Для автостарта Node-Red выполнил:
$ sudo systemctl enable nodered.service

Теперь можно проверить переключение реле, подавая 1 и 0 на GPIO 17 с помощью штатной ноды Inject в связке с rpi-gpio out:

Первый блин комом — вместо отчетливого щелчка я услышал еле-еле заметную попытку. Померял напряжение мультиметром, а там всего лишь 2 V. Значит, транзистор не ушел в насыщение и не открылся. Поэтому заменил номинал резистора (поставил 470Ω вместо 1кΩ). Ток с GPIO-пина вырос до ~5,6 мА — вполне безопасное значение. Проверил — реле уверенно переключается.

MQTT

Теперь пришла пора поставить брокер сообщений Mosquitto и настроить базовую авторизацию по логину и паролю:

$ sudo apt install -y mosquitto mosquitto-clients

Как и Node-Red, имеет смысл добавить Mosquitto в автостарт:

$ sudo systemctl enable mosquitto

В комплекте есть специальная утилита, позволяющая безопасно хранить заданный пароль для указанного пользователя:

$ sudo mosquitto_passwd -c /etc/mosquitto/passwd nodered

По умолчанию Mosquitto читает конфиги в директории /etc/mosquitto/conf.d/:

$ sudo nano /etc/mosquitto/conf.d/local.conf

Прописываем туда порт, запрещаем подключаться без авторизации и говорим, где лежит файл с хешем пароля:

listener 1883
allow_anonymous false
password_file /etc/mosquitto/passwd

Перечитываем конфиг и перезапускаем брокер:

$ sudo systemctl restart mosquitto

Теперь можно возвращаться в веб-интерфейс Node-Red и настроить работу через MQTT. В первую очередь надо добавить сервер. Так как все запущено на Raspberry, прописываем адрес 127.0.0.1 (или localhost):

Переходим на вкладку Security и указываем там ранее созданного пользователя и пароль nodered / password:

Слушать мы будем определенный топик. Назовем его для примера state:

Переделываем тестовую схему — вместо прямого соединения добавляем ноды MQTT в качестве посредника. При нажатии кнопки Deploy снизу каждой из них появится актуальный статус. Если все настроено правильно, то будет надпись connected:

Проверив еще раз, что отправка сообщения переключает реле, соединяем его выводы COM и NO (Normally Open) в разрыв жилы +5 V USB-кабеля. Проще всего взять отдельный USB-удлинитель и именно у него перерезать жилу. Это даст возможность управлять так любым USB-прибором, а не только конкретным табло.

Итак, первая часть проекта завершена. Поднят рабочий сервер Node-Red с брокером Mosquitto, который ожидает сообщения в топике state с 1 или 0 и дает команду на реле для включения табло. Теперь пора написать приложение, которое будет считывать данные с сервера TeamSpeak и отсылать сообщение MQTT.

Soft

У TeamSpeak есть два режима ServerQuery:

  1. RAW (читай Telnet) на 10011.

  2. SSH (да-да, он самый) на 10022.

Первый вариант хорош тем, что туда можно подключаться обычным сокетом, читать и писать строки, а также вытворять прочие безумства. Альтернатива — SSH. Все то же самое, только безопасно. Приложение будет на Python 3.11, к которому я дополнительно установлю пару пакетов с клиентами SSH (paramiko) и MQTT (paho-mqtt).

$ sudo apt install python3-paho-mqtt python3-paramiko

Код, разумеется, писался в нормальной IDE, но для переноса воспользовался редактором nano:

$ sudo nano ts3_presence_to_mqtt.py

Полный код:
import socket, time, sys from typing import Optional, Tuple import paho.mqtt.client as mqtt import paramiko  # ================== НАСТРОЙКИ ================== TS3_HOST = "192.168.88.105" TS3_PROTOCOL = "ssh" TS3_QUERY_PORT = 10022 TS3_LOGIN = "serveradmin" TS3_PASSWORD = "CHANGE_ME" TS3_SID = 1 TS3_TARGET_NICK = "Test"  MQTT_HOST = "192.168.88.27" MQTT_PORT = 1883 MQTT_USERNAME = "nodered" MQTT_PASSWORD = "password" MQTT_TOPIC = "state" MQTT_QOS = 1  CHECK_PERIOD_SEC = 1.0 CONNECT_TIMEOUT = 5.0 RECONNECT_DELAY_SEC = 3.0 # ===============================================  def ts3_unescape(s: str) -> str:     rep = {r"\\s":" ", r"\\p":"|", r"\\/":"/", r"\\n":"\n", r"\\r":"\r", r"\\t":"\t", r"\\v":"\v", r"\\\\":"\\"}     for k,v in rep.items(): s = s.replace(k,v)     return s  def parse_presence(payload: str, target_nick: str) -> bool:     if not payload: return False     for rec in payload.split("|"):         kv = {}         for part in rec.split():             if "=" in part:                 k,v = part.split("=",1)                 kv[k] = ts3_unescape(v)         if kv.get("client_nickname") == target_nick:             return True     return False  class TS3Raw:     def __init__(self, host, port, user, password, sid):         self.host, self.port, self.user, self.password, self.sid = host, port, user, password, sid         self.sock: Optional[socket.socket] = None      def _send(self, cmd: str):         self.sock.sendall((cmd.strip()+"\n").encode())      def _read_until_ok(self) -> Tuple[str, dict]:         self.sock.settimeout(CONNECT_TIMEOUT)         buf = b""         while True:             chunk = self.sock.recv(4096)             if not chunk: raise ConnectionError("TS3 socket closed")             buf += chunk             text = buf.decode("utf-8","replace")             if "\nerror " in text or text.endswith("error "):                 lines = text.strip().splitlines()                 if not lines or not lines[-1].startswith("error "): continue                 err_line = lines[-1][6:]                 err = {}                 for p in err_line.split():                     if "=" in p:                         k,v = p.split("=",1)                         err[k] = ts3_unescape(v)                 payload = "\n".join(lines[:-1])                 return payload, err      def connect(self):         self.sock = socket.create_connection((self.host, self.port), timeout=CONNECT_TIMEOUT)         self.sock.settimeout(1.0)         try: _ = self.sock.recv(4096)         except Exception: pass         self._send(f"login client_login_name={self.user} client_login_password={self.password}")         _, err = self._read_until_ok()         if err.get("id") != "0": raise PermissionError(f"login failed: {err}")         self._send(f"use sid={self.sid}")         _, err = self._read_until_ok()         if err.get("id") != "0": raise RuntimeError(f"use failed: {err}")      def clientlist(self) -> str:         self._send("clientlist")         payload, err = self._read_until_ok()         if err.get("id") != "0": raise RuntimeError(f"clientlist failed: {err}")         return payload      def close(self):         try:             if self.sock:                 try: self._send("quit")                 except Exception: pass                 self.sock.close()         finally:             self.sock = None  class TS3SSH:     def __init__(self, host, port, user, password, sid):         self.host, self.port, self.user, self.password, self.sid = host, port, user, password, sid         self.ssh: Optional[paramiko.SSHClient] = None         self.chan: Optional[paramiko.Channel] = None         self.buf = ""      def connect(self):         self.ssh = paramiko.SSHClient()         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())         self.ssh.connect(self.host, port=self.port, username=self.user, password=self.password, timeout=CONNECT_TIMEOUT, allow_agent=False, look_for_keys=False)         self.chan = self.ssh.invoke_shell(width=160, height=24)         self.chan.settimeout(CONNECT_TIMEOUT)         time.sleep(0.2)         self._drain()         self._send(f"use sid={self.sid}")         self._read_until_ok()      def _send(self, cmd: str):         self.chan.send((cmd.strip()+"\n").encode())      def _drain(self):         try:             while self.chan.recv_ready():                 self.buf += self.chan.recv(4096).decode("utf-8","replace")         except Exception:             pass      def _read_until_ok(self) -> Tuple[str, dict]:         deadline = time.time() + CONNECT_TIMEOUT         while time.time() < deadline:             self._drain()             if "\nerror " in self.buf:                 text = self.buf                 self.buf = ""                 lines = text.strip().splitlines()                 if not lines: continue                 idx = None                 for i in range(len(lines)-1, -1, -1):                     if lines[i].startswith("error "):                         idx = i; break                 if idx is None: continue                 err_line = lines[idx][6:]                 err = {}                 for p in err_line.split():                     if "=" in p:                         k,v = p.split("=",1)                         err[k] = ts3_unescape(v)                 payload = "\n".join(lines[:idx])                 return payload, err             time.sleep(0.05)         raise TimeoutError("TS3 SSH read timeout")      def clientlist(self) -> str:         self._send("clientlist")         payload, err = self._read_until_ok()         if err.get("id") != "0": raise RuntimeError(f"clientlist failed: {err}")         return payload      def close(self):         try:             if self.chan:                  try: self._send("quit")                 except Exception: pass                 self.chan.close()         finally:             if self.ssh:                 self.ssh.close()             self.chan = None             self.ssh = None  class MqttPublisher:     def __init__(self, host, port, username, password, topic, qos=1):         self.client = mqtt.Client()         if username: self.client.username_pw_set(username, password)         self.topic, self.qos = topic, qos         self.client.connect_async(host, port, keepalive=30)         self.client.loop_start()     def publish_state(self, v:int): self.client.publish(self.topic, str(v), qos=self.qos, retain=True)  def make_ts3():     if TS3_PROTOCOL == "ssh":         return TS3SSH(TS3_HOST, TS3_QUERY_PORT, TS3_LOGIN, TS3_PASSWORD, TS3_SID)     else:         return TS3Raw(TS3_HOST, TS3_QUERY_PORT, TS3_LOGIN, TS3_PASSWORD, TS3_SID)  def main():     ts3 = make_ts3()     mqtt_pub = MqttPublisher(MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD, MQTT_TOPIC, MQTT_QOS)     last = None     while True:         try:             if isinstance(ts3, TS3Raw) and ts3.sock is None: ts3.connect()             if isinstance(ts3, TS3SSH) and ts3.chan is None: ts3.connect()             payload = ts3.clientlist()             value = 1 if parse_presence(payload, TS3_TARGET_NICK) else 0         except Exception as e:             sys.stderr.write(f"[WARN] TS3 check failed: {e}\n")             value = 0             try: ts3.close()             except Exception: pass             time.sleep(RECONNECT_DELAY_SEC)         if value != last:             mqtt_pub.publish_state(value)             last = value         time.sleep(CHECK_PERIOD_SEC)  if __name__ == "__main__":     try: main()     except KeyboardInterrupt: pass

Теперь достаточно запустить приложение в фоновом режиме следующей командой:

$ python ts3_presence_to_mqtt.py &

Как только вы подключитесь к серверу TeamSpeak с именем Test, реле щелкнет и табло незамедлительно загорится. Ну а при дисконнекте — потухнет. По умолчанию процесс подключения к серверу происходит разово, а далее оно запрашивает статус каждую секунду. При необходимости интервал можно увеличить, заменив значение переменной CHECK_PERIOD_SEC.

Что получилось

Этот небольшой DIY-проект оказался довольно интересным в реализации. Разумеется, было бы проще взять модуль с одним или несколькими реле, но и самодельный драйвер работает без каких-либо проблем. Программный код в целом тоже получился несложным — нужно было учесть, что TS3 Server Query экранирует спецсимволы, — и накидать собственный вариант «декодера». Для однозначного определения завершения любого ответа реализован парсинг до строки вида error id=[код] msg=[сообщение].

Чтобы комфортно работать с постоянным подключением к серверу, был написан отдельный класс, который занимается всем — от установки соединения с чтения баннера приветствия (при его наличии) до корректного завершения с закрытием сокета. Еще один класс — минимальная обертка над paho-mqtt с асинхронным подключением к брокеру и фоновым loop_start(), при котором клиент сам держит соединение и отправляет keepalive.

От сервера мы получаем полный ответ clientlist, который парсим с помощью функции parse_presence. Записи там идут с разделителем «|», а каждая из них по факту пара «ключ=значение» через пробел. В итоге достаточно собрать словарь полей, снять экранирование спецсимволов и проверить соответствие никнейма из «Настроек» полученному ответу. На основании этого отправить 1 или 0 по MQTT.

Ну что, как вам самоделка?


ссылка на оригинал статьи https://habr.com/ru/articles/945564/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *