Task-Сервер на «Flask»: как заставить несколько компьютеров решать одну задачу

от автора

Привет, Хабр!

Подготовительным этапом для видеоаналитики с применением методов машинного обучения — является выгрузка записей из видеорегистраторов Hikvision, что является достаточно длительной задачей, особенно если регистраторов несколько, давайте попытаемся разобраться как выполнить эту работу быстрее и удобнее, используя сервер.

Основанием для разработки «Task-Сервера» —  послужила задача по выгрузке видеозаписей с видеорегистраторов Hikvision. В работе была использована библиотека Hikload которая подключалась к регистраторам и загружала видеоролики.

 Для начала немного о «Task-Сервере» (Рис. 1), в данном случае это небольшой сервер, хранящий в базе данных или файле заранее подготовленные строки, которые будут переданы в качестве параметров клиентам, подключенным к хосту. Клиент же в свою очередь запускает скрипт с полученными данными и после выполнения запрашивает новую задачу до тех пор, пока задачи не закончатся.

Рис. 1 – схема Task-Сервера
Рис. 1 – схема Task-Сервера

В качестве инструмента для создания такого сервера был выбран микрофреймворк Flask, нам достаточно небольшого веб-приложения для нашей задачи. Из плюсов Flask можно выделить простоту создания веб-приложений, отсутствие сложных настроек и достаточно стабильная и быстрая работа.  В качестве хранилища для команд была выбрана таблица Excel. Все задачи были сформированы заранее.

Проблема

Так как количество камер в разных помещениях, разных городов достаточно много, а время выгрузки с одного устройства сильно варьировалось, от нескольких минут до часа (Рис. 2), то было принято решение: на основе параметров для подключения к регистраторам сформировать задачи и распределить их на разных машинах, для чего и потребовался сервер.

Рис. 2 Процесс выгрузки видео
Рис. 2 Процесс выгрузки видео

Решение:

Начнем работу с создания сервера, для начала создаем новое окружение под приложение следующей командой в терминале:

python –m venv venv

Так как решение было использовать Flask то подключаем следующие модули:

  1. «Flask» —  для создания веб-приложения.

  2. «openpyxl» – непосредственно для работы с excel файлом, где находятся команды.

  3. «multiprocessing» — из этого модуля нам потребуются только очереди, так как они идеально подходят для нашей задачи.

Для начала импортируем библиотеку Flask и инициализируем приложение:

from flask import Flask app = Flask(__name__)

Импортируем остальные библиотеки:

from openpyxl import load_workbook from multiprocessing import Queue

Создаем и наполняем базу данных задачами, которые сервер будет возвращать клиенту. Задачи сформированы заранее, они состоят из сервера/ ip – адреса к которому подключаемся, user — логина, password — пароля, cameras — номера камеры. По умолчанию если не задавать дату и время — то Hikload выгружает видео за один текущий день. Полный список параметров для выгрузки видео можно узнать, набрав «hikload —help» (рис. 3)

Рис. 3 Все параметры для выгрузки с Hikload
Рис. 3 Все параметры для выгрузки с Hikload

Далее добавим функцию, которая возвращает команду из базы данных. Все задачи из таблицы Excel выгружаются в Queue(очередь).

data = load_workbook('test.xlsx', 'wb')  sheet = data.active que = Queue() result = []  #выгрузка задач из таблицы Excel и запись в очередь for row in sheet:     for cell in row:         result.append(cell.value)     que.put(result)     result = []  #выдача команд из очереди     def get_task():     return que.get()  #отправка команд клиентам.

Декоратор «app.route» регистрирует URL-адрес, по которому будет доступно представление, описанное ниже и вызывается экземпляром приложения app(Flask):

@app.route('/') @app.route('/index') def index(): return ' '.join(map(str, get_task()))

Весь код сервера представлен ниже:

from flask import Flask  app = Flask(__name__)  from openpyxl import load_workbook from multiprocessing import Queue  data = load_workbook('test.xlsx', 'wb')  sheet = data.active que = Queue() result = []  #выгрузка задач из таблицы Excel и запись в очередь for row in sheet:     for cell in row:         result.append(cell.value)     que.put(result)     result = []  #выдача команд из очереди     def get_task():     return que.get()   #отправка команд клиентам     @app.route('/') @app.route('/index') def index():     if not que.empty():         return ' '.join(map(str, get_task()))     else:         return  if __name__ == '__main__':     app.run(debug=False)

Сервер готов, очередь заполнена задачами, следующим пунктом будет разработка клиента для правильной интерпретации принятых команд и запуска соответствующих скриптов. Рецепт программы-клиента состоит из следующих ингредиентов:

  1. «Requests» для обращения к серверу и получения задачи.

def get_task():     for _ in range(2):         resp = rq.get('localhost')         if resp != “the end”:             tasks.append(list((resp.content.decode('utf-8').split(' '))))         else:             return None     return (tasks)
  1. «Multiprocessing» для выполнения 2-х задач (можно и другое количество, зависящее от характеристик вашего устройства) параллельно.

if __name__ == '__main__':     while True:    tasks = [] #создаем пустой список для задач         response = get_task()         NUM_CORE = 2 #объявляем количество ядер         read = mp.Queue()         [read.put(x) for x in response] #записываем очередь командами         print("read qsize", read.qsize())          for i in range(NUM_CORE):                mp.Process(target=load, args=(read,)).start() #запускаем скрипт  mp.join()         while True:             time.sleep(1)
  1. Модуль «os» непосредственно, для запуска Hikload.

def load(read):     while True:         if not read.empty(): #проверяем заполненность очереди             x = read.get()              name_proc = mp.current_process().name             print(f"{name_proc} stared") #выводим работающие процессы             task = (f'hikload --server={x[2]} '                        f'--user={x[0]} '                       f'--password={x[1]} '                       f'--cameras={x[3]} '                       f'--downloads {x[2]+ "_" + name_proc}'                     )             print(task)             '''Запуск скрипта с нашими командами'''             os.system(task)         else:              break

Код клиента целиком выглядит следующим образом:

import multiprocessing as mp, os, time, requests as rq  #получаем задачи с сервера def get_task():     for _ in range(2):         resp = rq.get('http://127.0.0.1:5000/')         if resp != 'the end':             tasks.append(list((resp.content.decode('utf-8').split(' '))))         else:             return None     return (tasks)  #запуск выгрузки видеозаписей       def load(read):     while True:         if not read.empty(): #проверяем заполненность очереди             x = read.get()              name_proc = mp.current_process().name             print(f"{name_proc} stared") #выводим работающие процессы             task = (f'hikload --server={x[2]} '                        f'--user={x[0]} '                       f'--password={x[1]} '                       f'--cameras={x[3] '                       f'--downloads {x[2]+ "_" + name_proc}'                     )             print(task)             '''Запуск скрипта с нашими командами'''             os.system(task)         else:              break  if __name__ == '__main__':     while True:         tasks = []         response = get_task()         NUM_CORE = 2 #объявляем количество ядер         read = mp.Queue()         get_task() #вызываем функцию загрузки задач         [read.put(x) for x in tasks] #записываем очередь командами         print("read qsize", read.qsize())          for i in range(NUM_CORE):                mp.Process(target=load, args=(read,)).start() #запускаем скрипт             mp.join()         while True:             time.sleep(1)

Для начала запускаем сервер (Рис. 5) на локальном хосте, следующей командой: flask run

Рис. 5 Запуск сервера на локальном хосте
Рис. 5 Запуск сервера на локальном хосте

Далее запускаем клиент, команда: python main.py:

Рис. 6 Запуск клиента и начало загрузки видеозаписей
Рис. 6 Запуск клиента и начало загрузки видеозаписей

Мы видим, что очередь автоматически заполнилась 2-мя задачами (Рис. 6) и клиент начал выгрузку, в это время сервер показывает, что передал 2 команды, код 200 в ответе говорит об успешной обработке запроса (Рис. 7)

Рис. 7 Ответ сервера на запрос
Рис. 7 Ответ сервера на запрос

Под каждую команду создаются отдельные папки, где хранятся выгруженные видеозаписи, названия формируются параметром —download (рис.8). После отработки/выгрузки команды – клиент снова посылает запрос на сервер и получает новую команду.

Рис. 8 Папки с выгруженными данными
Рис. 8 Папки с выгруженными данными

И, конечно, вывод

В результате мы реализовали автоматизированное выполнение задачи, разделенной на множество подзадач. На каждом клиенте выполняются 2 выгрузки параллельно (рис.9)

Рис. 9 Выполнение задач в командной строке
Рис. 9 Выполнение задач в командной строке

Из явных преимуществ такого подхода можно выделить:

  1. Загрузка видеозаписей проходит быстрее с каждым подключенным клиентом.

  2. Все задачи распределяются автоматически.

  3. Сервер прост в разработке, многофункционален и его можно использовать для любых, похожих на показанной в примере задач.


ссылка на оригинал статьи https://habr.com/ru/post/687032/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *