Пятничный рабочий день на удалёнке уже подходил к концу, как в дверь постучали, чтобы сообщить об установке нового домофона. Узнав, что новый домофон имеет мобильное приложение, позволяющее отвечать на звонки не находясь дома, я заинтересовался и сразу же загрузил его на свой телефон. Залогинившись, я обнаружил интересную особенность этого приложения — даже без активного вызова в мою квартиру я мог смотреть в камеру домофона и открывать дверь в произвольный момент времени. «Да это же онлайн АРI к двери подъезда!» — щёлкнуло в голове. Судьба предстоящих выходных была предрешена.
Видеодемонстрация в конце статьи.

Дисклеймер
Используемая технология аутентификации на основании распознавания лиц не учитывает множества сценариев атак и недостаточна для обеспечения безопасности. Описанная система анализа кадров была активна только в моменты, когда кому-то из моей семьи было необходимо попасть в подъезд — она не совершала автоматическую обработку лиц других людей и не могла привести к допуску в подъезд третьих лиц.
Получение API
Чтобы автоматизировать управление дверью, необходимо понять, куда и в каком формате отправляет запросы само приложение. Реверс-инжиниринг — дело неоднозначное, поэтому я попытался обойтись без него и вместо этого просто перехватить свой собственный трафик. Для этой задачи я взял НТТР Тооlkit — комплекс программ, который позволяет наладить прослушивание http(s) запросов собственного Android устройства.
Первая попытка оказывается провальной — после установки на телефон Android-части инструментария и сгенерированного Certificate authority оказалось, что мобильное приложение домофона не доверяет пользовательским СА. Согласно документации, начиная с Android 7 манифест приложения должен явно изъявлять такое желание.
Так как мой телефон не поддерживает root доступ для модификации списка системных СА, я воспользовался официальным эмулятором Android, идущим в комплекте с Android Studio. После запуска эмулятора и перехвата с помощью ADB меня встретило радостное сообщение о том, что трафик от всех приложений без Certificate pinning будет успешно расшифрован.

К счастью, приложение оказалось как раз из таких — немного побродив по приложению и открыв дверь, можно переходить к анализу логов.

Всего интересными показалось три запроса:
-
Запрос на открытие двери подъезда: POST по адресу
/rest/v1/places/{place_id}/accesscontrols/{control_id}/actionsс JSON-телом{"name": "accessControlOpen"} -
Получение снимка (превью) с камеры: GET по адресу
/rest/v1/places/{place_id}/accesscontrols/{control_id}/videosnapshots -
Получение ссылки на видеопоток с аудио: GET по адресу
/rest/v1/forpost/cameras/{camera_id}/video?LightStream=0
HTTP заголовки всех трёх запросов содержат ключ Authorization — судя по всему, именно по нему происходит авторизация при выполнении запросов. Сделав пару запросов руками через Advanced REST Client, чтобы убедиться, что заголовка Authorization достаточно и в самостоятельной работе с API не осталось подводных камней, я понял, что можно откладывать эмулятор и переходить к написанию кода.
Вооружившись Python и requests, я написал необходимые для следующих этапов функции исполнения каждого из трёх действий:
HEADERS = {"Authorization": "Bearer ###"} ACTION_URL = "https://###.ru/rest/v1/places/###/accesscontrols/###/" VIDEO_URL = "https://###.ru/rest/v1/forpost/cameras/###/video?LightStream=0" def get_image(): result = requests.get(f'{ACTION_URL}/videosnapshots', headers=HEADERS) if result.status_code != 200: logging.error(f"Failed to get an image with status code {result.status_code}") return None logging.warning(f"Image received successfully in {result.elapsed.total_seconds()}sec") return result.content def open_door(): result = requests.post( f'{ACTION_URL}/actions', headers=HEADERS, json={"name": "accessControlOpen"}) if result.status_code != 200: logging.error(f"Failed to open the door with status code {result.status_code}") return False logging.warning(f"Door opened successfully in {result.elapsed.total_seconds()}sec") return True def get_videostream_link(): result = requests.get(VIDEO_URL, headers=HEADERS) if result.status_code != 200: logging.error(f"Failed to get stream link with status code {result.status_code}") return False logging.warning(f"Stream link received successfully in {result.elapsed.total_seconds()}sec") return result.json()['data']['URL']
Поиск знакомых лиц в кадре
Прежде чем начать этот раздел, нужно рассказать пару слов об уже имеющихся на тот момент у меня в распоряжении серверных мощностях — это недорогая виртуальная машина с доступом ко всего одному потоку Intel(R) Xeon(R) CPU E5-2650L v3 @ 1.80GHz, 1GB оперативной памяти и 0 GPU. Тратиться на более дорогую конфигурацию не хотелось, а значит, нужно было попробовать выжать максимум из имеющихся ресурсов.
На данном этапе проектом заинтересовалась моя жена, хорошо разбирающаяся в машинном обучении и в итоге взявшая на себя эту задачу. Для этой работы был выбран OpenVINO Toolkit — инструментарий от Intel, в том числе заточенный как раз на запуск моделей машинного обучения на CPU.
Непродолжительный поиск существующих решений привёл на страницу Interactive Face Recognition Demo — официального демо, показывающего ровно необходимый функционал сравнения видимых в кадре лиц с базой заранее сохранённых. Единственная проблема состояла в том, что данный пример по каким-то причинам исчез после релиза 2020.3, а удобная установка пакета через pip у проекта появилась только с 2021.1. Было решено установить последнюю версию OpenVINO и адаптировать код под неё.
К счастью, гит помнит всё и получить из репозитория проекта код демо и нужные модели не составило труда. После удаления всей работы с командной строкой (взяв для всего значения по умолчанию), визуализацией и видеопотоком вебкамеры, был получен класс, способный распознавать лица в конкретном кадре:
class ImageProcessor: def __init__(self): self.frame_processor = FrameProcessor() def process(self, image): detections = self.frame_processor.process(image) labels = [] for roi, landmarks, identity in zip(*detections): label = self.frame_processor.face_identifier.get_identity_label( identity.id) labels.append(label) return labels
После создания базы лиц из десятка селфи можно было переходить к тестированию фактической производительности полученного решения на имеющемся железе. В качестве подопытных картинок я взял фотографию себя и фото пустой улицы, сделанные на домофонную камеру функцией get_image() выше.
100 runs on an image with known face: Total time: 7.356s Time per frame: 0.007s FPS: 135.944 100 runs on an image without faces: Total time: 2.985s Time per frame: 0.003s FPS: 334.962
Очевидно, что показанная производительность с запасом покрывает нужды детектирования лиц в реальном времени.
1 FPS: Работа со снимками с камеры
Итак, на этом этапе у меня уже был класс для поиска заданных лиц в кадре, а также функции получения снимка с камеры и ссылки на видеопоток. С видео на тот момент я никогда не работал, потому для MVP решил переложить работу по выделению кадров на сервер и снова воспользоваться get_image().
class ImageProcessor: # <...> def process_single_image(self, image): nparr = np.fromstring(image, np.uint8) img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR) labels = self.process(img_np) return labels def snapshot_based_intercom_id(): processor = ImageProcessor() last_open_door_time = time.time() while True: start_time = time.time() image = get_image() result = processor.process_single_image(image) logging.info(f'{result} in {time.time() - start_time}s') # Successfull detections are "face{N}" if any(['face' in res for res in result]): if start_time - last_open_door_time > 5: open_door() with open(f'images/{start_time}_OK.jfif', 'wb') as f: f.write(image) last_open_door_time = start_time
Цикл получает картинки от домофона, ищет на них знакомые лица, открывает дверь в случае успешной детекции и сохраняет фото на память. Важно было не забыть ограничить частоту отправки команды открытия двери, т.к. лицо обычно распознаётся ещё на подходах к двери.

Заработало! Полностью довольный первым запуском, я вернулся в квартиру. Единственное, что портило впечатление от системы распознавания — время реакции на появление лица в кадре, т.к. время отклика API оставляло желать лучшего. Низкая частота поступления данных, 0.7с на получение картинки и 0.6с на открытие двери, давали видимый невооружённым взглядом лаг.
До 30 FPS: Обработка видеопотока
Получить кадры из видео оказалось на удивление просто:
vcap = cv2.VideoCapture(link) success, frame = vcap.read()
Замеры показали, что камера домофона способна выдавать стабильные 30 FPS. Проблемой оказалось поддержание меньшей частоты кадров: метод read() возвращает последний необработанный кадр из внутренней очереди кадров. Если эта очередь наполняется видеопотоком быстрее, чем вычитывается, то обрабатываемые кадры начинают отставать от реального времени, что приводит к нежелательной задержке. Дополнительно, с практической точки зрения, распознавать лица 30 раз в секунду — пустая трата вычислительных ресурсов, так как обычно люди подходят к двери подъезда с небольшой скоростью.
Первым потенциальным решением было установить размер внутренней очереди: vcap.set(CV_CAP_PROP_BUFFERSIZE, 0);. Согласно найденной информации, такой трюк должен был хорошо работать с любой конфигурацией системы для версий OpenCV выше 3.4, но по какой-то причине, так и не оказал никакого влияния в моём случае. Единственной рабочей альтернативой стал подход, описанный в этом ответе со StackOverflow — завести отдельный поток, читающий кадры из камеры на максимально возможной скорости и сохраняющий последний в поле класса для дальнейшего доступа (впоследствии оказалось, что именно этот цикл ответственен за большую часть потребления процессора).
Получилась модификация ImageProcessor для обработки видеопотока с частотой 3 кадра в секунду:
class CameraBufferCleanerThread(threading.Thread): def __init__(self, camera, name='camera-buffer-cleaner-thread'): self.camera = camera self.last_frame = None self.finished = False super(CameraBufferCleanerThread, self).__init__(name=name) self.start() def run(self): while not self.finished: ret, self.last_frame = self.camera.read() def __enter__(self): return self def __exit__(self, type, value, traceback): self.finished = True self.join() class ImageProcessor: # <...> def process_stream(self, link): vcap = cv2.VideoCapture(link) interval = 0.3 # ~3 FPS with CameraBufferCleanerThread(vcap) as cam_cleaner: while True: frame = cam_cleaner.last_frame if frame is not None: yield (self.process(frame), frame) else: yield (None, None) time.sleep(interval)
И соответствующая модификация snapshot_based_intercom_id:
def stream_based_intercom_id(): processor = ImageProcessor() link = get_videostream_link() # To notify about delays last_time = time.time() last_open_door_time = time.time() for result, np_image in processor.process_stream(link): current_time = time.time() delta_time = current_time - last_time if delta_time < 1: logging.info(f'{result} in {delta_time}') else: logging.warning(f'{result} in {delta_time}') last_time = current_time if result is None: continue if any(['face' in res for res in result]): if current_time - last_open_door_time > 5: logging.warning( f'Hey, I know you - {result[0]}! Opening the door...') last_open_door_time = current_time open_door() cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)
Тест в реальных условиях показал ощутимое падение времени отклика — при хорошем освещении подъездная дверь оказывалась открытой ещё до того, как я успевал подойти к ней вплотную.

Управление с помощью Telegram бота
Сама система доказала свою работоспособность и теперь хотелось создать для неё удобный интерфейс для включения/выключения. Телеграм бот показался отличным вариантом решения для этой задачи.
С помощью пакета python-telegram-bot была написана простая оболочка, принимающая в себя callback включения/выключения системы и список доверенных никнеймов.
class TelegramInterface: def __init__(self, login_whitelist, state_callback): self.state_callback = state_callback self.login_whitelist = login_whitelist self.updater = Updater( token = "###", use_context = True) self.run() def run(self): dispatcher = self.updater.dispatcher dispatcher.add_handler(CommandHandler("start", self.start)) dispatcher.add_handler(CommandHandler("run", self.run_intercom)) dispatcher.add_handler(CommandHandler("stop", self.stop_intercom)) self.updater.start_polling() def run_intercom(self, update: Update, context: CallbackContext): user = update.message.from_user update.message.reply_text( self.state_callback(True) if user.username in self.login_whitelist else 'not allowed', reply_to_message_id=update.message.message_id) def stop_intercom(self, update: Update, context: CallbackContext): user = update.message.from_user update.message.reply_text( self.state_callback(False) if user.username in self.login_whitelist else 'not allowed', reply_to_message_id=update.message.message_id) def start(self, update: Update, context: CallbackContext) -> None: update.message.reply_text('Hi!') class TelegramBotThreadWrapper(threading.Thread): def __init__(self, state_callback, name='telegram-bot-wrapper'): self.whitelist = ["###", "###"] self.state_callback = state_callback super(TelegramBotThreadWrapper, self).__init__(name=name) self.start() def run(self): self.bot = TelegramInterface(self.whitelist, self.state_callback)
И следующая ступень эволюции функции intercom_id, обрабатывающая синхронизацию между потоками обработки данных и получения команд от бота:
def stream_based_intercom_id_with_telegram(): processor = ImageProcessor() loop_state_lock = threading.Lock() loop_should_run = False loop_should_change_state_cv = threading.Condition(loop_state_lock) is_loop_finished = True loop_changed_state_cv = threading.Condition(loop_state_lock) def stream_processing_loop(): nonlocal loop_should_run nonlocal loop_should_change_state_cv nonlocal is_loop_finished nonlocal loop_changed_state_cv while True: with loop_should_change_state_cv: loop_should_change_state_cv.wait_for(lambda: loop_should_run) is_loop_finished = False loop_changed_state_cv.notify_all() logging.warning(f'Loop is started') link = get_videostream_link() last_time = time.time() last_open_door_time = time.time() for result, np_image in processor.process_stream(link): with loop_should_change_state_cv: if not loop_should_run: is_loop_finished = True loop_changed_state_cv.notify_all() logging.warning(f'Loop is stopped') break current_time = time.time() delta_time = current_time - last_time if delta_time < 1: logging.info(f'{result} in {delta_time}') else: logging.warning(f'{result} in {delta_time}') last_time = current_time if result is None: continue if any(['face' in res for res in result]): if current_time - last_open_door_time > 5: logging.warning(f'Hey, I know you - {result[0]}! Opening the door...') last_open_door_time = current_time open_door() cv2.imwrite(f'images/{current_time}_OK.jpg', np_image) def state_callback(is_running): nonlocal loop_should_run nonlocal loop_should_change_state_cv nonlocal is_loop_finished nonlocal loop_changed_state_cv with loop_should_change_state_cv: if is_running == loop_should_run: return "Intercom service state is not changed" loop_should_run = is_running if loop_should_run: loop_should_change_state_cv.notify_all() loop_changed_state_cv.wait_for(lambda: not is_loop_finished) return "Intercom service is up" else: loop_changed_state_cv.wait_for(lambda: is_loop_finished) return "Intercom service is down" telegram_bot = TelegramBotThreadWrapper(state_callback) logging.warning("Bot is ready") stream_processing_loop()
Результат
Видео:
Послесловие
Несмотря на возможности, которые технология умных домофонов несёт жильцам, объединённые в единую сеть сотни (тысячи?) подъездных дверей с камерами и микрофонами (да, в произвольно получаемом видеопотоке есть и аудио!), ведущими непрерывную запись — как по мне, скорее пугающее явление, открывающее новые возможности для нарушения приватности.
Я бы предпочёл, чтобы доступ к видеопотоку предоставлялся только в момент звонка в квартиру и ведущаяся трёхдневная запись, позиционирующаяся как средство раскрытия правонарушений, хранилась не на серверах компании, а непосредственно в домофоне, с возможностью доступа к ней по запросу. Или не велась вовсе.
ссылка на оригинал статьи https://habr.com/ru/post/561502/
Добавить комментарий