Оцифровка показаний стрелочного манометра в Home Assistant

от автора

Давно мучал вопрос передачи показаний давления системы отопления со штатного манометра газового котла. Для этого несколько лет назад была приобретена камера ESP32-CAM и интегрирована в Home Assistant посредством ESPHome.

Камеру я направил прямо на манометр, что позволило мне периодически вручную удаленно контролировать давление в системе отопления, и, при необходимости, открывать кран подачи воды в систему (также удалённо).

Штатный светодиод ESP32-CAM помогает разглядеть манометр в закрытом шкафу

Штатный светодиод ESP32-CAM помогает разглядеть манометр в закрытом шкафу

Для того, чтобы хоть как-то облагородить этот «аналоговый» процесс в Node-RED была создана автоматизация, завязанная на контроле значений энергопотребления котла и насосов:

Работало это так:

  1. В Телеграм приходит уведомление об ошибке котла;

  2. Я лезу в интерфейс HA и в камере смотрю какое давление;

  3. Если давление низкое — открываю кран подпитки.

Уже неплохо, но руки-то чешутся, поэтому я решил, что изображение надо распознать и вывести давление в HA в нормальном виде.

Выбор способа оцифровки

Помимо Raspberry Pi 4B, на котором у меня крутится HA у меня есть самосборный NAS c Xpenology на борту. Поэтому я решил задействовать в этом деле его.

Установка контейнера Python в Xpenology

Первым делом в Container Manager загружаем образ Python 3-10-slim

Создание контейнера:

  1. Открываем вкладку Образы, выбираем python и нажимаем Запустить;

  2. Как-нибудь называем контейнер;

  3. На этапе Настройка томов:

    • Нажимаем Добавить папку

    • Выбираем папку /volume1/docker/gauge

    • В контейнере указываем путь: /app

  4. Нажимаем Далее, потом Применить

Получаем такой результат

Получаем такой результат

Далее временно включаем доступ к NAS по SSH в панели управления:

И подключаемся к с серверу с помощью Putty

Вводим логин, пароль, переключаемся на root, получаем ID контейнера:

docker ps

Заходим в контейнер:

docker exec -it ID_контейнера bash

Устанавливаем необходимые библиотеки:

apt update && apt install -y python3-pip libgl1 libglib2.0-0 apt-get install -y python3-pip python3-dev apt-get install -y libglib2.0-0 libsm6 libxext6 libxrender-dev apt-get install -y qt5-qmake qtbase5-dev-tools qtchooser apt-get install -y libx11-dev libgl1-mesa-glx libfontconfig1 libxkbcommon0 apt-get install -y libxcb-xinerama0 pip install opencv-python numpy pip3 install opencv-python matplotlib

Создаем скрипт gauge_reader.py и кладем его в папку docker\gauge на NAS. К нему мы вернемся чуть позже.

Настройка Home Assistant

Теперь мне нужно положить в эту же папку, где лежит скрипт само изображение, которое нужно распознать. Для этого делаем следующее:

  1. В HA переходим в Настройки, Хранилище, Добавить сетевое хранилище, указываем путь к папке docker на NAS:

После создаем автоматизацию, которая будет каждый час включать светодиод на ESP32-CAM, делать снимок, выкладывать его в папку gauge, которую мы подключили шагом выше, и выключать светодиод:

alias: ESP Cam snapshot каждый час description: "" triggers:   - minutes: 0     trigger: time_pattern actions:   - target:       entity_id: switch.pressure_boiler_flash_led     action: switch.turn_on     data: {}   - delay:       hours: 0       minutes: 0       seconds: 10       milliseconds: 0   - data:       entity_id: camera.pressure_boiler_my_camera       filename: /media/gauge/sample.jpg     action: camera.snapshot   - delay:       hours: 0       minutes: 0       seconds: 10       milliseconds: 0   - target:       entity_id: switch.pressure_boiler_flash_led     action: switch.turn_off     data: {} mode: single 

И сразу создадим сенсор, в который будет присылать NAS результат распознавания давления:

sensor:     - name: "Boiler Pressure"       state_topic: "sensors/boiler/pressure"       unit_of_measurement: "bar"       device_class: pressure       unique_id: pressure_boiler

Подготовка фото

Теперь надо немного поколдовать с самим объектом — манометром. Спустя пару десятков итераций со скриптом я пришел к тому, что несмотря на все ухищрения, качество изображения не позволяет со 100% результатом распознать стрелку манометра. Поэтому я наклеил вокруг манометра квадрат для возможности правильной обрезки и коррекции перспективы. Получилось так:

ESP32-CAM в лего-кронштейне. Над манометром временно прилепил белый лист бумаги, т.к. рельеф корпуса котла иногда вносил небольшую неразбериху в определении угла квадрата.

ESP32-CAM в лего-кронштейне. Над манометром временно прилепил белый лист бумаги, т.к. рельеф корпуса котла иногда вносил небольшую неразбериху в определении угла квадрата.

Написание скрипта (хвала нейросетям!)

Теперь переходим к скрипту. Задачу по определению давления разбиваем на следующие шаги:

  1. Обнаруживаем на фото четырехугольник;

  2. Корректируем перспективу — превращаем четырехугольник в квадрат и обрезаем изображение;

  3. Определяем центр и стрелку манометра;

  4. Замеряем угол отклонения стрелки и по таблице калибровки переводим угол-градусы в давление-бары;

  5. Отрисовываем некоторые этапы для контроля выполнения кода скрипта;

  6. Отправляем полученные данные в mqtt HA.

import cv2 import numpy as np import math import logging import paho.mqtt.client as mqtt from typing import Optional, Tuple, List  # Настройка логгирования logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)  # Конфигурация class Config:     MQTT_BROKER = "192.168.1.****"     MQTT_PORT = ****     MQTT_TOPIC = "sensors/boiler/pressure"     MQTT_USER = "****"     MQTT_PASS = "****"     MQTT_TIMEOUT = 5          CALIBRATION = {         215: 0.0,         155: 1.0,         90: 2.0,         29: 3.0,         327: 4.0     }          OUTPUT_DIR = '/app/'     OUTPUT_SIZE = 600     BLUR_SIZE = (5, 5)     ADAPTIVE_THRESH_BLOCK = 11     ADAPTIVE_THRESH_C = 2     HOUGH_CIRCLES_PARAMS = {         'dp': 1,         'minDist': 100,         'param1': 50,         'param2': 30,         'minRadius': 100,         'maxRadius': 200     }     HOUGH_LINES_PARAMS = {         'rho': 1,         'theta': np.pi/180,         'threshold': 50,         'minLineLength': 150,         'maxLineGap': 10     }  def detect_quadrilateral(image: np.ndarray) -> Tuple[Optional[List[Tuple[int, int]]], np.ndarray]:     """Определение углов четырехугольника вокруг манометра"""     gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)     _, thresh = cv2.threshold(gray, 50, 255, cv2.THRESH_BINARY_INV)          kernel = np.ones((5,5), np.uint8)     thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)     thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)          contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)          if not contours:         return None, image.copy()          largest_contour = max(contours, key=cv2.contourArea)     epsilon = 0.03 * cv2.arcLength(largest_contour, True)     approx = cv2.approxPolyDP(largest_contour, epsilon, True)          if len(approx) != 4:         for eps in [0.02, 0.04, 0.05]:             epsilon = eps * cv2.arcLength(largest_contour, True)             approx = cv2.approxPolyDP(largest_contour, epsilon, True)             if len(approx) == 4:                 break         else:             return None, image.copy()          points = np.array([point[0] for point in approx], dtype=np.float32)     center = np.mean(points, axis=0)          def sort_key(point):         return math.atan2(point[1] - center[1], point[0] - center[0])          sorted_points = sorted(points, key=sort_key, reverse=True)     bottom_idx = np.argmax([p[1] for p in sorted_points])     sorted_points = np.roll(sorted_points, -bottom_idx, axis=0)          if bottom_idx == 1:         sorted_points = np.roll(sorted_points, -1, axis=0)          corners = [tuple(map(int, p)) for p in sorted_points]          vis = image.copy()     for i, corner in enumerate(corners):         cv2.circle(vis, corner, 10, (0, 255, 0), -1)         cv2.putText(vis, f"{i+1}", (corner[0]+15, corner[1]+15),                     cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)          for i in range(4):         cv2.line(vis, corners[i], corners[(i+1)%4], (255, 0, 0), 2)          return corners, vis  def correct_perspective(image: np.ndarray, corners: List[Tuple[int, int]]) -> np.ndarray:     """Коррекция перспективы на основе найденных углов"""     size = Config.OUTPUT_SIZE     dst_points = np.array([         [0, size-1],         [size-1, size-1],         [size-1, 0],         [0, 0]     ], dtype=np.float32)          src_points = np.array(corners, dtype=np.float32)     matrix = cv2.getPerspectiveTransform(src_points, dst_points)     corrected = cv2.warpPerspective(image, matrix, (size, size))          return corrected  def enhance_image(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[np.ndarray, np.ndarray]:     """Улучшение изображения и выделение стрелки"""     x, y, w, h = roi     cropped = image[y:y+h, x:x+w]     gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)     blurred = cv2.GaussianBlur(gray, Config.BLUR_SIZE, 0)     thresh = cv2.adaptiveThreshold(         blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,         cv2.THRESH_BINARY_INV, Config.ADAPTIVE_THRESH_BLOCK, Config.ADAPTIVE_THRESH_C     )     return thresh, cropped  def find_dial_center(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[Tuple[int, int, int], np.ndarray]:     """Определение центра циферблата"""     x, y, w, h = roi     cropped = image[y:y+h, x:x+w]     gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)     vis = cropped.copy()          circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, **Config.HOUGH_CIRCLES_PARAMS)          if circles is not None:         circles = np.uint16(np.around(circles))         best = circles[0][0]         cv2.circle(vis, (best[0], best[1]), best[2], (0, 255, 0), 2)         cv2.circle(vis, (best[0], best[1]), 2, (0, 0, 255), 3)         center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg"         cv2.imwrite(center_vis_path, vis)         return (x + best[0], y + best[1], best[2]), vis          center = (x + w//2, y + h//2, min(w, h)//2)     cv2.circle(vis, (w//2, h//2), min(w, h)//2, (0, 255, 0), 2)     center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg"     cv2.imwrite(center_vis_path, vis)     return center, vis  def find_arrow_angle(thresh: np.ndarray, original: np.ndarray,                      roi: Tuple[int, int, int, int],                      center_info: Tuple[int, int, int]) -> Tuple[Optional[float], Optional[Tuple[int, int]], np.ndarray]:     """Поиск угла стрелки"""     x, y, w, h = roi     cX, cY, radius = center_info     cropped = original[y:y+h, x:x+w]     cX_roi, cY_roi = cX - x, cY - y          params = Config.HOUGH_LINES_PARAMS.copy()     params['minLineLength'] = radius * 0.5          edges = cv2.Canny(thresh, 50, 150)     lines = cv2.HoughLinesP(edges, **params)          if lines is not None:         filtered = []         for line in lines:             x1, y1, x2, y2 = line[0]             dist1 = math.hypot(x1 - cX_roi, y1 - cY_roi)             dist2 = math.hypot(x2 - cX_roi, y2 - cY_roi)             if min(dist1, dist2) < radius * 0.3:                 filtered.append(line)                  if filtered:             longest = max(filtered, key=lambda l: math.hypot(l[0][0]-l[0][2], l[0][1]-l[0][3]))             x1, y1, x2, y2 = longest[0]                          tip = (x2, y2) if math.hypot(x1-cX_roi, y1-cY_roi) < math.hypot(x2-cX_roi, y2-cY_roi) else (x1, y1)             angle = math.degrees(math.atan2(cY_roi - tip[1], tip[0] - cX_roi)) % 360                          cv2.line(cropped, (cX_roi, cY_roi), tip, (0, 0, 255), 3)             return angle, (cX, cY), cropped          return None, None, cropped  def setup_mqtt() -> mqtt.Client:     """Настройка MQTT клиента"""     client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)     client.username_pw_set(Config.MQTT_USER, Config.MQTT_PASS)     client.connect(Config.MQTT_BROKER, Config.MQTT_PORT, Config.MQTT_TIMEOUT)     return client  def calibrate_pressure(angle: float) -> float:     """Калибровка давления"""     angles = np.array(sorted(Config.CALIBRATION.keys()))     pressures = np.array([Config.CALIBRATION[a] for a in angles])     return float(np.interp(angle, angles, pressures))  def send_mqtt(pressure: float) -> bool:     """Отправка данных через MQTT"""     try:         client = setup_mqtt()         client.publish(Config.MQTT_TOPIC, f"{pressure:.2f}")         client.disconnect()         return True     except Exception as e:         logger.error(f"Ошибка MQTT: {str(e)}")         return False  def process_image(image_path: str) -> Optional[float]:     """Основная функция обработки изображения"""     try:         image = cv2.imread(image_path)         if image is None:             raise FileNotFoundError(f"Не удалось загрузить изображение: {image_path}")                  quad_corners, quad_vis = detect_quadrilateral(image)         if quad_corners is None:             logger.error("Не удалось обнаружить четырехугольник")             return None                      cv2.imwrite(f"{Config.OUTPUT_DIR}quadrilateral_detection.jpg", quad_vis)         corrected_image = correct_perspective(image, quad_corners)         cv2.imwrite(f"{Config.OUTPUT_DIR}corrected_perspective.jpg", corrected_image)                  corrected_roi = (0, 0, Config.OUTPUT_SIZE, Config.OUTPUT_SIZE)         thresh, cropped = enhance_image(corrected_image, corrected_roi)         center, _ = find_dial_center(corrected_image, corrected_roi)         angle, _, processed = find_arrow_angle(thresh, corrected_image, corrected_roi, center)                  if angle is None:             logger.error("Не удалось определить угол стрелки")             return None                  pressure = calibrate_pressure(angle)         cv2.putText(processed, f"Angle: {angle:.1f}°", (20, 40),                     cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)         cv2.putText(processed, f"Pressure: {pressure:.2f} bar", (20, 80),                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)                  cv2.imwrite(f"{Config.OUTPUT_DIR}processed.jpg", processed)         return pressure              except Exception as e:         logger.error(f"Ошибка обработки: {str(e)}")         return None  def main():     image_path = '/app/sample.jpg'     pressure = process_image(image_path)     if pressure is not None:         send_mqtt(pressure)  if __name__ == "__main__":     main()

Код скрипта писал трудяга DeepSeek под моим чутким руководством. Сначала пытался это сделать в ChatGPT, но после достижения бесплатного лимита запросов он резко «тупел» и дело шло уже не так хорошо:)

Определение углов

Определение углов
Коррекция перспективы и обрезка изображения

Коррекция перспективы и обрезка изображения
Определение центра и окружности манометра

Определение центра и окружности манометра
Поиск стрелки и определение давления по углу отклонения стрелки

Поиск стрелки и определение давления по углу отклонения стрелки

Настройка автозапуска скрипта в Xpenology

Осталось дело за малым — настраиваем на NAS автозапуск скрипта. Переходим в Панель управления, Планировщик задач, Создать, Запланированная задача, Скрипт заданный пользователем.

Во вкладке Общие указываем, что задача должна выполняться от пользователя root. Расписание — ежедневно каждый час. В Настройки задач в поле Выполнить команду прописываем docker exec ID_контейнера python3 /app/gauge_reader.py.

Результат

Теперь раз в час HA делает фотографию манометра и кладет ее в папку NAS. А NAS спустя пять минут распознает это дело шлёт в mqtt фактическое давление в системе отопления.

Да, я в курсе, что сейчас уже есть DIY zigbee манометры. Может быть я соберусь с мыслями, возьму паяльник и врежусь в систему отопления и в систему водоснабжения, чтобы можно было автоматизировать включение крана подпитки системы отопления только в случае наличия холодной воды. Но пока пусть поработает так.

Почему не реализовал распознавание изображения на Raspberry Pi? Просто так захотелось))

Сама идея взаимодействия двух устройств в домашней сети показалась интересной.


ссылка на оригинал статьи https://habr.com/ru/articles/908238/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *