
Иногда мы с товарищами собираемся, чтобы сыграть несколько забегов в игре PUBG: Battlegrounds. Взаимодействие между участниками команды (сквада) тут возведено в абсолют, и голосовая коммуникация жизненно необходима для успешного выживания. В PUBG нельзя расслабляться ни на секунду. Частенько бывают ситуации, когда нужно на слух определять местоположение соперника. Любой посторонний звук в такой момент способен испортить матч всей команде.
Чтобы минимизировать ситуации, когда мои домочадцы пытаются со мной заговорить во время игры, я решил поставить световое табло ON AIR, похожее на то, которое используют на радиостанциях (и да, после такой наглости я даже выжил). Задумка была в том, чтобы табло автоматически зажигалось, когда я нахожусь на сервере TeamSpeak, и отключалось после дисконнекта. Что в итоге у меня получилось — читайте дальше.
Прежде всего я раздобыл готовое табло. Разумеется, можно было бы заморочиться с неоном, но я решил обойтись обычным LED. Цена на такую штуку не слишком велика, но вот с автоматизацией придется повозиться. Первое, что я сделал, — проверил ток потребления:
1,44 А — это действительно много, чтобы брать напрямую с RPI. Запитывать придется от отдельного блока. Управлять буду с помощью реле. Нормальные люди обычно покупают готовые модули, но ждать три недели с маркетплейса не хотелось. Под рукой же лежало реле TRJ-5VDC-SA-CD:
Достаточно подать на управляющие контакты 5 В 40 мА — и оно успешно сработает. Проблемы две: GPIO Raspberry Pi оперирует напряжением 3,3 V, а максимум c такого пина можно отдать ~16 мА. Тем не менее на самой гребенке два пина 5 V, которые по факту выдают до 1–2 А в зависимости от блока питания и загруженности платы. Само реле для удержания в открытом состоянии будет потреблять 40–70 мА. Получается, нужно собрать простейший драйвер.
Плюс не стоит забывать, что реле управляется катушкой, которая благодаря коварной физике накапливает в себе магнитную энергию. Достаточно резко оборвать ток — и напряжение на выводах способно подскочить и выжечь нафиг наш нежный GPIO (привет, обратная ЭДС). Так что надо будет подумать о защите в виде диода.
Делаю драйвер
Порывшись в коробочках с радиодеталями (у каждого мужчины такая со временем появляется), я нашел NPN-транзистор КТ3102 в корпусе TO-92 и сигнальный кремниевый диод 1N4148. Вначале я рассчитывал, что мне для открытия транзистора будет достаточно ~2,6 мА, поэтому ошибочно взял резистор на 1кΩ.
Дальше собрал схему:
-
+5 V (pin 2) — анод 1N4148;
-
катод 1N4148 — катушка управления;
-
катушка управления — эмиттер транзистора;
-
GPIO 17 (pin 11) — резистор;
-
резистор — база транзистора;
-
GND (pin 6) — коллектор транзистора.

Быстро спаял навесным монтажом и подключил к «малинке». Чтобы было проще, установил Node-Red при помощи волшебного скрипта:
$ bash <(curl -sL https://github.com/node-red/linux-installers/releases/latest/download/update-nodejs-and-nodered-deb)
Для автостарта Node-Red выполнил:
$ sudo systemctl enable nodered.service
Теперь можно проверить переключение реле, подавая 1 и 0 на GPIO 17 с помощью штатной ноды Inject в связке с rpi-gpio out:

Первый блин комом — вместо отчетливого щелчка я услышал еле-еле заметную попытку. Померял напряжение мультиметром, а там всего лишь 2 V. Значит, транзистор не ушел в насыщение и не открылся. Поэтому заменил номинал резистора (поставил 470Ω вместо 1кΩ). Ток с GPIO-пина вырос до ~5,6 мА — вполне безопасное значение. Проверил — реле уверенно переключается.
MQTT
Теперь пришла пора поставить брокер сообщений Mosquitto и настроить базовую авторизацию по логину и паролю:
$ sudo apt install -y mosquitto mosquitto-clients
Как и Node-Red, имеет смысл добавить Mosquitto в автостарт:
$ sudo systemctl enable mosquitto
В комплекте есть специальная утилита, позволяющая безопасно хранить заданный пароль для указанного пользователя:
$ sudo mosquitto_passwd -c /etc/mosquitto/passwd nodered
По умолчанию Mosquitto читает конфиги в директории /etc/mosquitto/conf.d/:
$ sudo nano /etc/mosquitto/conf.d/local.conf
Прописываем туда порт, запрещаем подключаться без авторизации и говорим, где лежит файл с хешем пароля:
listener 1883
allow_anonymous false
password_file /etc/mosquitto/passwd
Перечитываем конфиг и перезапускаем брокер:
$ sudo systemctl restart mosquitto
Теперь можно возвращаться в веб-интерфейс Node-Red и настроить работу через MQTT. В первую очередь надо добавить сервер. Так как все запущено на Raspberry, прописываем адрес 127.0.0.1 (или localhost):

Переходим на вкладку Security и указываем там ранее созданного пользователя и пароль nodered / password:

Слушать мы будем определенный топик. Назовем его для примера state:

Переделываем тестовую схему — вместо прямого соединения добавляем ноды MQTT в качестве посредника. При нажатии кнопки Deploy снизу каждой из них появится актуальный статус. Если все настроено правильно, то будет надпись connected:

Проверив еще раз, что отправка сообщения переключает реле, соединяем его выводы COM и NO (Normally Open) в разрыв жилы +5 V USB-кабеля. Проще всего взять отдельный USB-удлинитель и именно у него перерезать жилу. Это даст возможность управлять так любым USB-прибором, а не только конкретным табло.
Итак, первая часть проекта завершена. Поднят рабочий сервер Node-Red с брокером Mosquitto, который ожидает сообщения в топике state с 1 или 0 и дает команду на реле для включения табло. Теперь пора написать приложение, которое будет считывать данные с сервера TeamSpeak и отсылать сообщение MQTT.
Soft
У TeamSpeak есть два режима ServerQuery:
-
RAW (читай Telnet) на 10011.
-
SSH (да-да, он самый) на 10022.
Первый вариант хорош тем, что туда можно подключаться обычным сокетом, читать и писать строки, а также вытворять прочие безумства. Альтернатива — SSH. Все то же самое, только безопасно. Приложение будет на Python 3.11, к которому я дополнительно установлю пару пакетов с клиентами SSH (paramiko) и MQTT (paho-mqtt).
$ sudo apt install python3-paho-mqtt python3-paramiko
Код, разумеется, писался в нормальной IDE, но для переноса воспользовался редактором nano:
$ sudo nano ts3_presence_to_mqtt.py
Полный код:
import socket, time, sys from typing import Optional, Tuple import paho.mqtt.client as mqtt import paramiko # ================== НАСТРОЙКИ ================== TS3_HOST = "192.168.88.105" TS3_PROTOCOL = "ssh" TS3_QUERY_PORT = 10022 TS3_LOGIN = "serveradmin" TS3_PASSWORD = "CHANGE_ME" TS3_SID = 1 TS3_TARGET_NICK = "Test" MQTT_HOST = "192.168.88.27" MQTT_PORT = 1883 MQTT_USERNAME = "nodered" MQTT_PASSWORD = "password" MQTT_TOPIC = "state" MQTT_QOS = 1 CHECK_PERIOD_SEC = 1.0 CONNECT_TIMEOUT = 5.0 RECONNECT_DELAY_SEC = 3.0 # =============================================== def ts3_unescape(s: str) -> str: rep = {r"\\s":" ", r"\\p":"|", r"\\/":"/", r"\\n":"\n", r"\\r":"\r", r"\\t":"\t", r"\\v":"\v", r"\\\\":"\\"} for k,v in rep.items(): s = s.replace(k,v) return s def parse_presence(payload: str, target_nick: str) -> bool: if not payload: return False for rec in payload.split("|"): kv = {} for part in rec.split(): if "=" in part: k,v = part.split("=",1) kv[k] = ts3_unescape(v) if kv.get("client_nickname") == target_nick: return True return False class TS3Raw: def __init__(self, host, port, user, password, sid): self.host, self.port, self.user, self.password, self.sid = host, port, user, password, sid self.sock: Optional[socket.socket] = None def _send(self, cmd: str): self.sock.sendall((cmd.strip()+"\n").encode()) def _read_until_ok(self) -> Tuple[str, dict]: self.sock.settimeout(CONNECT_TIMEOUT) buf = b"" while True: chunk = self.sock.recv(4096) if not chunk: raise ConnectionError("TS3 socket closed") buf += chunk text = buf.decode("utf-8","replace") if "\nerror " in text or text.endswith("error "): lines = text.strip().splitlines() if not lines or not lines[-1].startswith("error "): continue err_line = lines[-1][6:] err = {} for p in err_line.split(): if "=" in p: k,v = p.split("=",1) err[k] = ts3_unescape(v) payload = "\n".join(lines[:-1]) return payload, err def connect(self): self.sock = socket.create_connection((self.host, self.port), timeout=CONNECT_TIMEOUT) self.sock.settimeout(1.0) try: _ = self.sock.recv(4096) except Exception: pass self._send(f"login client_login_name={self.user} client_login_password={self.password}") _, err = self._read_until_ok() if err.get("id") != "0": raise PermissionError(f"login failed: {err}") self._send(f"use sid={self.sid}") _, err = self._read_until_ok() if err.get("id") != "0": raise RuntimeError(f"use failed: {err}") def clientlist(self) -> str: self._send("clientlist") payload, err = self._read_until_ok() if err.get("id") != "0": raise RuntimeError(f"clientlist failed: {err}") return payload def close(self): try: if self.sock: try: self._send("quit") except Exception: pass self.sock.close() finally: self.sock = None class TS3SSH: def __init__(self, host, port, user, password, sid): self.host, self.port, self.user, self.password, self.sid = host, port, user, password, sid self.ssh: Optional[paramiko.SSHClient] = None self.chan: Optional[paramiko.Channel] = None self.buf = "" def connect(self): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh.connect(self.host, port=self.port, username=self.user, password=self.password, timeout=CONNECT_TIMEOUT, allow_agent=False, look_for_keys=False) self.chan = self.ssh.invoke_shell(width=160, height=24) self.chan.settimeout(CONNECT_TIMEOUT) time.sleep(0.2) self._drain() self._send(f"use sid={self.sid}") self._read_until_ok() def _send(self, cmd: str): self.chan.send((cmd.strip()+"\n").encode()) def _drain(self): try: while self.chan.recv_ready(): self.buf += self.chan.recv(4096).decode("utf-8","replace") except Exception: pass def _read_until_ok(self) -> Tuple[str, dict]: deadline = time.time() + CONNECT_TIMEOUT while time.time() < deadline: self._drain() if "\nerror " in self.buf: text = self.buf self.buf = "" lines = text.strip().splitlines() if not lines: continue idx = None for i in range(len(lines)-1, -1, -1): if lines[i].startswith("error "): idx = i; break if idx is None: continue err_line = lines[idx][6:] err = {} for p in err_line.split(): if "=" in p: k,v = p.split("=",1) err[k] = ts3_unescape(v) payload = "\n".join(lines[:idx]) return payload, err time.sleep(0.05) raise TimeoutError("TS3 SSH read timeout") def clientlist(self) -> str: self._send("clientlist") payload, err = self._read_until_ok() if err.get("id") != "0": raise RuntimeError(f"clientlist failed: {err}") return payload def close(self): try: if self.chan: try: self._send("quit") except Exception: pass self.chan.close() finally: if self.ssh: self.ssh.close() self.chan = None self.ssh = None class MqttPublisher: def __init__(self, host, port, username, password, topic, qos=1): self.client = mqtt.Client() if username: self.client.username_pw_set(username, password) self.topic, self.qos = topic, qos self.client.connect_async(host, port, keepalive=30) self.client.loop_start() def publish_state(self, v:int): self.client.publish(self.topic, str(v), qos=self.qos, retain=True) def make_ts3(): if TS3_PROTOCOL == "ssh": return TS3SSH(TS3_HOST, TS3_QUERY_PORT, TS3_LOGIN, TS3_PASSWORD, TS3_SID) else: return TS3Raw(TS3_HOST, TS3_QUERY_PORT, TS3_LOGIN, TS3_PASSWORD, TS3_SID) def main(): ts3 = make_ts3() mqtt_pub = MqttPublisher(MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD, MQTT_TOPIC, MQTT_QOS) last = None while True: try: if isinstance(ts3, TS3Raw) and ts3.sock is None: ts3.connect() if isinstance(ts3, TS3SSH) and ts3.chan is None: ts3.connect() payload = ts3.clientlist() value = 1 if parse_presence(payload, TS3_TARGET_NICK) else 0 except Exception as e: sys.stderr.write(f"[WARN] TS3 check failed: {e}\n") value = 0 try: ts3.close() except Exception: pass time.sleep(RECONNECT_DELAY_SEC) if value != last: mqtt_pub.publish_state(value) last = value time.sleep(CHECK_PERIOD_SEC) if __name__ == "__main__": try: main() except KeyboardInterrupt: pass
Теперь достаточно запустить приложение в фоновом режиме следующей командой:
$ python ts3_presence_to_mqtt.py &
Как только вы подключитесь к серверу TeamSpeak с именем Test, реле щелкнет и табло незамедлительно загорится. Ну а при дисконнекте — потухнет. По умолчанию процесс подключения к серверу происходит разово, а далее оно запрашивает статус каждую секунду. При необходимости интервал можно увеличить, заменив значение переменной CHECK_PERIOD_SEC.
Что получилось
Этот небольшой DIY-проект оказался довольно интересным в реализации. Разумеется, было бы проще взять модуль с одним или несколькими реле, но и самодельный драйвер работает без каких-либо проблем. Программный код в целом тоже получился несложным — нужно было учесть, что TS3 Server Query экранирует спецсимволы, — и накидать собственный вариант «декодера». Для однозначного определения завершения любого ответа реализован парсинг до строки вида error id=[код] msg=[сообщение].
Чтобы комфортно работать с постоянным подключением к серверу, был написан отдельный класс, который занимается всем — от установки соединения с чтения баннера приветствия (при его наличии) до корректного завершения с закрытием сокета. Еще один класс — минимальная обертка над paho-mqtt с асинхронным подключением к брокеру и фоновым loop_start(), при котором клиент сам держит соединение и отправляет keepalive.
От сервера мы получаем полный ответ clientlist, который парсим с помощью функции parse_presence. Записи там идут с разделителем «|», а каждая из них по факту пара «ключ=значение» через пробел. В итоге достаточно собрать словарь полей, снять экранирование спецсимволов и проверить соответствие никнейма из «Настроек» полученному ответу. На основании этого отправить 1 или 0 по MQTT.
Ну что, как вам самоделка?
ссылка на оригинал статьи https://habr.com/ru/articles/945564/
Добавить комментарий