Привет, Хабр!
Подготовительным этапом для видеоаналитики с применением методов машинного обучения — является выгрузка записей из видеорегистраторов Hikvision, что является достаточно длительной задачей, особенно если регистраторов несколько, давайте попытаемся разобраться как выполнить эту работу быстрее и удобнее, используя сервер.

Основанием для разработки «Task-Сервера» — послужила задача по выгрузке видеозаписей с видеорегистраторов Hikvision. В работе была использована библиотека Hikload которая подключалась к регистраторам и загружала видеоролики.
Для начала немного о «Task-Сервере» (Рис. 1), в данном случае это небольшой сервер, хранящий в базе данных или файле заранее подготовленные строки, которые будут переданы в качестве параметров клиентам, подключенным к хосту. Клиент же в свою очередь запускает скрипт с полученными данными и после выполнения запрашивает новую задачу до тех пор, пока задачи не закончатся.

В качестве инструмента для создания такого сервера был выбран микрофреймворк Flask, нам достаточно небольшого веб-приложения для нашей задачи. Из плюсов Flask можно выделить простоту создания веб-приложений, отсутствие сложных настроек и достаточно стабильная и быстрая работа. В качестве хранилища для команд была выбрана таблица Excel. Все задачи были сформированы заранее.
Проблема
Так как количество камер в разных помещениях, разных городов достаточно много, а время выгрузки с одного устройства сильно варьировалось, от нескольких минут до часа (Рис. 2), то было принято решение: на основе параметров для подключения к регистраторам сформировать задачи и распределить их на разных машинах, для чего и потребовался сервер.

Решение:
Начнем работу с создания сервера, для начала создаем новое окружение под приложение следующей командой в терминале:
python –m venv venv
Так как решение было использовать Flask то подключаем следующие модули:
-
«Flask» — для создания веб-приложения.
-
«openpyxl» – непосредственно для работы с excel файлом, где находятся команды.
-
«multiprocessing» — из этого модуля нам потребуются только очереди, так как они идеально подходят для нашей задачи.
Для начала импортируем библиотеку Flask и инициализируем приложение:
from flask import Flask app = Flask(__name__)
Импортируем остальные библиотеки:
from openpyxl import load_workbook from multiprocessing import Queue
Создаем и наполняем базу данных задачами, которые сервер будет возвращать клиенту. Задачи сформированы заранее, они состоят из сервера/ ip – адреса к которому подключаемся, user — логина, password — пароля, cameras — номера камеры. По умолчанию если не задавать дату и время — то Hikload выгружает видео за один текущий день. Полный список параметров для выгрузки видео можно узнать, набрав «hikload —help» (рис. 3)

Далее добавим функцию, которая возвращает команду из базы данных. Все задачи из таблицы Excel выгружаются в Queue(очередь).
data = load_workbook('test.xlsx', 'wb') sheet = data.active que = Queue() result = [] #выгрузка задач из таблицы Excel и запись в очередь for row in sheet: for cell in row: result.append(cell.value) que.put(result) result = [] #выдача команд из очереди def get_task(): return que.get() #отправка команд клиентам.
Декоратор «app.route» регистрирует URL-адрес, по которому будет доступно представление, описанное ниже и вызывается экземпляром приложения app(Flask):
@app.route('/') @app.route('/index') def index(): return ' '.join(map(str, get_task()))
Весь код сервера представлен ниже:
from flask import Flask app = Flask(__name__) from openpyxl import load_workbook from multiprocessing import Queue data = load_workbook('test.xlsx', 'wb') sheet = data.active que = Queue() result = [] #выгрузка задач из таблицы Excel и запись в очередь for row in sheet: for cell in row: result.append(cell.value) que.put(result) result = [] #выдача команд из очереди def get_task(): return que.get() #отправка команд клиентам @app.route('/') @app.route('/index') def index(): if not que.empty(): return ' '.join(map(str, get_task())) else: return if __name__ == '__main__': app.run(debug=False)
Сервер готов, очередь заполнена задачами, следующим пунктом будет разработка клиента для правильной интерпретации принятых команд и запуска соответствующих скриптов. Рецепт программы-клиента состоит из следующих ингредиентов:
-
«Requests» для обращения к серверу и получения задачи.
def get_task(): for _ in range(2): resp = rq.get('localhost') if resp != “the end”: tasks.append(list((resp.content.decode('utf-8').split(' ')))) else: return None return (tasks)
-
«Multiprocessing» для выполнения 2-х задач (можно и другое количество, зависящее от характеристик вашего устройства) параллельно.
if __name__ == '__main__': while True: tasks = [] #создаем пустой список для задач response = get_task() NUM_CORE = 2 #объявляем количество ядер read = mp.Queue() [read.put(x) for x in response] #записываем очередь командами print("read qsize", read.qsize()) for i in range(NUM_CORE): mp.Process(target=load, args=(read,)).start() #запускаем скрипт mp.join() while True: time.sleep(1)
-
Модуль «os» непосредственно, для запуска Hikload.
def load(read): while True: if not read.empty(): #проверяем заполненность очереди x = read.get() name_proc = mp.current_process().name print(f"{name_proc} stared") #выводим работающие процессы task = (f'hikload --server={x[2]} ' f'--user={x[0]} ' f'--password={x[1]} ' f'--cameras={x[3]} ' f'--downloads {x[2]+ "_" + name_proc}' ) print(task) '''Запуск скрипта с нашими командами''' os.system(task) else: break
Код клиента целиком выглядит следующим образом:
import multiprocessing as mp, os, time, requests as rq #получаем задачи с сервера def get_task(): for _ in range(2): resp = rq.get('http://127.0.0.1:5000/') if resp != 'the end': tasks.append(list((resp.content.decode('utf-8').split(' ')))) else: return None return (tasks) #запуск выгрузки видеозаписей def load(read): while True: if not read.empty(): #проверяем заполненность очереди x = read.get() name_proc = mp.current_process().name print(f"{name_proc} stared") #выводим работающие процессы task = (f'hikload --server={x[2]} ' f'--user={x[0]} ' f'--password={x[1]} ' f'--cameras={x[3] ' f'--downloads {x[2]+ "_" + name_proc}' ) print(task) '''Запуск скрипта с нашими командами''' os.system(task) else: break if __name__ == '__main__': while True: tasks = [] response = get_task() NUM_CORE = 2 #объявляем количество ядер read = mp.Queue() get_task() #вызываем функцию загрузки задач [read.put(x) for x in tasks] #записываем очередь командами print("read qsize", read.qsize()) for i in range(NUM_CORE): mp.Process(target=load, args=(read,)).start() #запускаем скрипт mp.join() while True: time.sleep(1)
Для начала запускаем сервер (Рис. 5) на локальном хосте, следующей командой: flask run

Далее запускаем клиент, команда: python main.py:

Мы видим, что очередь автоматически заполнилась 2-мя задачами (Рис. 6) и клиент начал выгрузку, в это время сервер показывает, что передал 2 команды, код 200 в ответе говорит об успешной обработке запроса (Рис. 7)

Под каждую команду создаются отдельные папки, где хранятся выгруженные видеозаписи, названия формируются параметром —download (рис.8). После отработки/выгрузки команды – клиент снова посылает запрос на сервер и получает новую команду.

И, конечно, вывод
В результате мы реализовали автоматизированное выполнение задачи, разделенной на множество подзадач. На каждом клиенте выполняются 2 выгрузки параллельно (рис.9)

Из явных преимуществ такого подхода можно выделить:
-
Загрузка видеозаписей проходит быстрее с каждым подключенным клиентом.
-
Все задачи распределяются автоматически.
-
Сервер прост в разработке, многофункционален и его можно использовать для любых, похожих на показанной в примере задач.
ссылка на оригинал статьи https://habr.com/ru/post/687032/
Добавить комментарий