Потоковый проект в режиме реального времени с использованием данных смартфона

от автора

Девайсы повсюду. Смартфоны, холодильники, дверные звонки, часы, медицинские датчики, системы безопасности и фитнес-трекеры — все это лишь некоторые из них, которые стали обычным явлением. Они постоянно записывают потенциально высокочастотную информацию и образуют сеть, известную как «Интернет вещей», или IoT, представляя обширные источники данных.

Хотя ресурсов по этой теме достаточно, немногие приводят примеры с реальными данными, доступными любому желающему. Переходя от статьи к статье, чтобы узнать о системах, управляемых событиями, и потоковых технологиях, таких как Apache Kafka, Harrison Hoffman наткнулся на приложение для смартфонов Sensor Logger, которое позволяет пользователям передавать данные с датчиков движения на свои телефоны. Такой вариант показался идеальным способом обучения, поэтому родился проект «smartphone_sensor_stream». Этот проект использует FastAPI, Kafka, QuestDB и Docker для визуализации данных датчиков в реальном времени на информационной панели.

В этой статье мы рассмотрим все основные компоненты этого проекта на продвинутом уровне. Все необходимое для локального запуска проекта доступно на GitHub, а краткая демонстрация доступна на YouTube. 

Архитектура проекта

Давайте начнем с рассмотрения архитектуры этого проекта. То есть с того, как именно данные будут поступать со смартфонов на панель мониторинга:

Каждый смартфон отправляет показания датчиков акселерометра, гироскопа и магнитометра через POST-запрос в приложение FastAPI. Производитель FastAPI асинхронно записывает показания датчика в раздел Kafka в виде JSON, данные из тела запроса. Каждый объект JSON обрабатывается процессом python, консумером, а потом сохраняется в таблице Quest DB. Как только данные попадают в базу данных, они становятся доступными для любой зависимой службы или приложения. В первой части этого проекта мы будем выводить показания датчиков на панель мониторинга, используя события, отправляемые сервером (SSE).

Структура каталогов и компоновка Docker Compose

Этот проект представляет собой набор небольших сервисов, которые взаимодействуют друг с другом для получения данных со смартфонов на информационную панель. Вот структура каталогов:

|-producer  | |-app  | | |-core  | | | |-config.py  | | |-__init__.py  | | |-schemas  | | | |-sensors.py  | | |-main.py  | |-requirements.txt  | |-Dockerfile  | |-entrypoint.sh |-db_consumer  | |-app  | | |-core  | | | |-config.py  | | |-models  | | | |-sensors.py  | | |-db  | | | |-ingress.py  | | |-main.py  | |-requirements.txt  | |-Dockerfile  | |-entrypoint.sh |-ui_server  | |-app  | | |-core  | | | |-config.py  | | |-models  | | | |-sensors.py  | | |-static  | | | |-js  | | | | |-main.js  | | |-db  | | | |-data_api.py  | | |-templates  | | | |-index.html  | | |-main.py  | |-requirements.txt  | |-Dockerfile  | |-entrypoint.sh |-README.md |-.gitignore |-.env |-docker-compose.yml

Мы напишем три сервиса: продюсер, консумер и пользовательский интерфейс. Каждая служба упакована с помощью Dockerfile и организована при помощи docker-compose. Docker-compose позволяет нам запускать сервисы, которые мы пишем, с внешними сервисами, Kafka, Zookeeper и QuestDB, в виде отдельных контейнеров, подключенных через внутреннюю сеть. Все, что нам нужно для организации служб в этом проекте, находится в файле docker-compose:

version: '3.8'  services:   zookeeper:     image: bitnami/zookeeper:latest     ports:       - 2181:2181     environment:       - ALLOW_ANONYMOUS_LOGIN=yes    kafka:     image: bitnami/kafka:latest     ports:       - 9092:9092       - 9093:9093     environment:       - KAFKA_BROKER_ID=1       - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092       - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092       - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181       - ALLOW_PLAINTEXT_LISTENER=yes       - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT       - KAFKA_CFG_LISTENERS=CLIENT://:9092       - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092       - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT     depends_on:       - zookeeper    questdb:     image: questdb/questdb     container_name: questdb     restart: always     expose:       - 9000       - 9009       - 9003     ports:       - 8812:8812     volumes:       - ./questdb:/root/.questdb     environment:       - QDB_LOG_W_STDOUT_LEVEL=ERROR       - QDB_LOG_W_FILE_LEVEL=ERROR       - QDB_LOG_W_HTTP_MIN_LEVEL=ERROR       - QDB_SHARED_WORKER_COUNT=2       - QDB_PG_USER=${DB_USER}       - QDB_PG_PASSWORD=${DB_PASSWORD}       - QDB_TELEMETRY_ENABLED=false       - QDB_CAIRO_SQL_COPY_ROOT=./   producer:     build:       context: ./producer       dockerfile: Dockerfile     command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 8000     ports:       - 8000:8000     env_file:       - .env     depends_on:       - kafka       - zookeeper   db_consumer:     build:       context: ./db_consumer       dockerfile: Dockerfile     command: python main.py     env_file:       - .env     depends_on:       - kafka       - zookeeper   ui_server:     build:       context: ./ui_server       dockerfile: Dockerfile     command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 5000     ports:       - 5000:5000     env_file:       - .env     depends_on:       - db_consumer   kafka-ui:     image: provectuslabs/kafka-ui     container_name: kafka-ui     ports:       - "18080:8080"     restart: always     environment:       - KAFKA_CLUSTERS_0_NAME=local       - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092       - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181     depends_on:       - kafka       - zookeeper volumes:   zookeeper_data:     driver: local   kafka_data:     driver: local

Обратите внимание на четыре сервиса, которые мы не пишем сами (к счастью): Zookeeper, Kafka, QuestDB и Kafka-UI. Эти сервисы работают совместно с производителем, потребителем и пользовательским интерфейсом для создания проекта. Мы рассмотрим каждый сервис в отдельности, но сначала нам нужно разобраться с источником данных.

Регистратор датчиков

Sensor Logger — это приложение для iOS и Android, которое позволяет пользователям регистрировать показания датчиков, связанных с движением, со своих смартфонов. Пользователи могут просматривать показания датчиков в режиме реального времени, экспортировать данные в виде файлов и отправлять оперативные данные на сервер по протоколу HTTP. Этот проект использует функциональность HTTP для извлечения показаний датчиков. Чтобы настроить регистратор, для начала убедитесь, что выбраны все следующие датчики:

Мы будем получать показания акселерометра, гироскопа и магнитометра телефона. Далее нам нужно настроить параметры регистратора датчиков таким образом, чтобы он знал, куда именно отправлять данные:

Наиболее важным компонентом является обеспечение правильности “Push URL” — это конечная точка производителя FastAPI, которая принимает необработанные показания датчиков с помощью POST-запросов. Мы будем использовать наш компьютер в качестве сервера, поэтому нам нужно определить соответствующий IP-адрес. На Mac это находится в разделе Системные настройки -> Сеть.

Обратите внимание, что IP-адрес компьютера обычно уникален для сети WI-FI, что означает, что новый IP-адрес выделяется каждый раз, когда компьютер подключается к новой сети. Поэтому крайне важно, чтобы смартфон и главный компьютер находились в одной сети. Производитель FastAPI принимает показания датчиков на:

http://%7Byour_ip_address%7D:8000/phone-producer

Вставьте приведенный выше URL-адрес в поле «Push URL», и регистратор датчиков должен быть готов к работе!

Кафка и ZooKeeper

В этом разделе не будем вдаваться в подробности о Kafka, поскольку на платформе доступно много ресурсов. Однако можно сказать, что Kafka — это высокопроизводительный фреймворк для хранения и чтения потоковых данных. Фундаментальная структура данных Кафки — журнал. Приложения, которые записывают сообщения в журнал, называются продюсерами. В отличие от очереди, сообщения в журнале сохраняются даже после прочтения — это позволяет нескольким приложениям, известным как консумеры, читать одновременно с разных позиций. 

Для простоты в этом проекте есть только один продюсер — приложение FastAPI, которое записывает необработанные показания датчиков в Kafka, и один консумер, процесс на python, который считывает сообщения из Kafka и форматирует их в базе данных. Zookeeper — это сервис, который помогает управлять различными компонентами Kafka.

Для локального запуска Kafka и Zookeeper необходимы только два образа docker:

zookeeper:     image: bitnami/zookeeper:latest     ports:       - 2181:2181     environment:       - ALLOW_ANONYMOUS_LOGIN=yes        kafka:     image: bitnami/kafka:latest     ports:       - 9092:9092       - 9093:9093     environment:       - KAFKA_BROKER_ID=1       - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092       - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092       - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181       - ALLOW_PLAINTEXT_LISTENER=yes       - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT       - KAFKA_CFG_LISTENERS=CLIENT://:9092       - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092       - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT     depends_on:       - zookeeper kafka-ui:     image: provectuslabs/kafka-ui     container_name: kafka-ui     ports:       - "18080:8080"     restart: always     environment:       - KAFKA_CLUSTERS_0_NAME=local       - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092       - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181     depends_on:       - kafka       - zookeeper

Мы будем использовать дистрибутив Kafka и Zookeeper от Bitmani. Kafka-UIImage позволяет пользователям взаимодействовать с кластерами Kafka через веб-приложение, но не требуется для этого проекта. Сохраните приведенный выше файл docker-compose как docker-compose.yml, запустите docker-compose, и графический интерфейс, подобный следующему, должен быть доступен по адресу http://localhost:18080/.

Информация о брокерах, темах и консумерах будет добавляться на эту панель мониторинга по мере добавления компонентов в систему.

Продюсер

Пока что у нас есть регистратор датчиков, настроенный для отправки необработанных показаний датчиков на сервер, и экземпляр Kafka готов к приему этих показаний. Следующий шаг — создать мост между исходными данными и Kafka-продюсером. Продюсером в этом проекте является быстрое API-приложение, которое принимает данные, отправляемые со смартфонов, и записывает их в журнал Kafka. Вот макет продюсера:

|-producer  | |-app  | | |-core  | | | |-config.py  | | |-__init__.py  | | |-schemas  | | | |-sensors.py  | | |-main.py  | |-requirements.txt  | |-Dockerfile  | |-entrypoint.sh

Мы не будем просматривать каждый файл в каталоге продюсера, поскольку все доступно на GitHub. Вместо этого давайте взглянем на main.py управляющий скрипт API производителя:

import json from fastapi import FastAPI import asyncio from aiokafka import AIOKafkaProducer from schemas.sensors import SensorReading, SensorResponse from core.config import app_config from loguru import logger  app = FastAPI(title=app_config.PROJECT_NAME)  loop = asyncio.get_event_loop()  producer = AIOKafkaProducer(     loop=loop,     client_id=app_config.PROJECT_NAME,     bootstrap_servers=app_config.KAFKA_URL )  @app.on_event("startup") async def startup_event():     await producer.start()  @app.on_event("shutdown") async def shutdown_event():     await producer.stop()  @app.post("/phone-producer/") async def kafka_produce(data: SensorReading):      """     Produce a message containing readings from a smartphone sensor.     Parameters     ----------     data : SensorReading         The request body containing sensor readings and metadata.     Returns     -------     response : SensorResponse         The response body corresponding to the processed sensor readings         from the request.     """      await producer.send(app_config.TOPIC_NAME, json.dumps(data.dict()).encode("ascii"))      response = SensorResponse(         messageId=data.messageId,         sessionId=data.sessionId,         deviceId=data.deviceId     )      logger.info(response)      return response

Строка 9 создает экземпляр объекта Fast API. Строки 11-17 создают экземпляр объекта Kafka-продюсер с помощью Aiokafka. Aiokafka позволяет нам записывать сообщения в Kafka асинхронно. Это значит, что нам не нужно ждать, пока Kafka получит и обработает сообщение, в строке 45, прежде чем мы перейдем к следующей строке кода. Вместо этого Aiokafka отправляет текущее сообщение в Kafka и почти мгновенно готова выдать другое сообщение. Строки 27-55 определяют маршрут, по которому будут приниматься необработанные показания датчиков. Чтобы лучше понять это, давайте взглянем на формат тела запроса, который ожидает этот маршрут — аргумент data):

{"messageId": 20,  "sessionId": "4bf3b3b9-a241-4aaa-b1d3-c05100df9976",  "deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b",  "payload": [{"name": "accelerometeruncalibrated",               "time": "1671406719721160400",               "values": {"z": -0.9372100830078125,                          "y": -0.3241424560546875,                           "x": 0.0323486328125}},              {"name": "magnetometeruncalibrated",               "time": "1671406719726579500",               "values": {"z": -5061.64599609375,                          "y": 591.083251953125,                          "x": 3500.541015625}},              {"name": "gyroscopeuncalibrated",               "time": "1671406719726173400",               "values": {"z": -0.004710599314421415,                          "y": -0.013125921599566936,                          "x": 0.009486978873610497}},  ... ]}

Каждое тело запроса представляет собой объект JSON с записями «MessageId», «SessionID», «DeviceID» и «payload». Смартфоны однозначно идентифицируются по их «идентификатору устройства». Каждый раз, когда телефон начинает новый поток, для него создается новый «sessionId». Запись «MessageId» указывает порядок расположения сообщений в последовательности из текущего сеанса. Запись «payload»  представляет собой массив объектов JSON, которые содержат показания для каждого датчика, настроенного в Sensor Logger. Каждая запись «payload»  содержит имя датчика, время записи показаний по времени unix и само считывание. Мы работаем исключительно с трехосными датчиками, поэтому каждый датчик должен иметь показания «x», «y» и «z», соответствующие трем пространственным измерениям.

Маршрут FastAPI записывает необработанное тело запроса непосредственно в раздел Kafka, в строке 45, а метаданные регистрируются и возвращаются в строках 47-55. Этот маршрут доступен по адресу http://{your_ip_address}:8000/phone-producer, как описано в разделе «Регистратор датчиков» (Sensor Logger) . Все запросы проверяются объектом Pydantic SensorReading. То есть, любой запрос, который не соответствует формату регистратора датчиков, не будет обработан маршрутом:

from pydantic import BaseModel, validator from datetime import datetime from typing import List, Dict, Union   class SensorReading(BaseModel):      """     Base model class for incoming requests from smartphone sensors     Attributes     ----------     messageId : int         The identifier of a message in the current session     sessionId : int         The identifier of a session     deviceId : int         The identifier of the device sending the data     payload : List[Dict[str:Union[str, int, Dict]]]         The payload of the request containing sensor readings         and metadata about the readings     """      messageId: int     sessionId: str     deviceId: str     payload: List[Dict[str, Union[str, int, Dict]]]      class SensorResponse(BaseModel):      """     Base model class for the response of the sensor request endpoint     Attributes     ----------     messageId : int         The identifier of a message in the current session     sessionId : int         The identifier of a session     deviceId : int         The identifier of the device sending the data     timestamp : str         The timestamp when a sensor request was processed     """     messageId: str     sessionId: str     deviceId: str     timestamp: str = ""      @validator("timestamp", pre=True, always=True)     def set_datetime_utcnow(cls, v):         return str(datetime.utcnow())

Конфигурация для производителя обрабатывается с помощью переменных окружения, которые считываются объектом Pedantic Base Settings:

from pydantic import BaseSettings, validator  # Load environment variables into a pydantic BaseSetting object class AppConfig(BaseSettings):      PROJECT_NAME : str      KAFKA_HOST : str      KAFKA_PORT : str     TOPIC_NAME : str      KAFKA_URL : str = ""      class Config:          case_sensitive = True      @validator("KAFKA_URL", pre=True, always=True)     def set_kafka_url(cls, v, values, **kwargs):         return values['KAFKA_HOST'] + ":" + values['KAFKA_PORT']   app_config = AppConfig()

Переменные окружения хранятся в файле .env:

# Kafka config PROJECT_NAME=phone_stream_producer TOPIC_NAME=raw-phone-stream KAFKA_HOST=kafka KAFKA_PORT=9092

и передаются проидюсеру в файле docker-compose, строка 9 ниже:

producer:     build:       context: ./producer       dockerfile: Dockerfile     command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 8000     ports:       - 8000:8000     env_file:       - .env     depends_on:       - kafka       - zookeeper

Обратите внимание, что значение host в команде запуска равно 0.0.0.0. Это позволяет получить доступ к продюсеру по его IP-адресу с любого устройства в локальной сети.

Консумер

Теперь у нас есть инфраструктура для потоковой передачи данных датчиков со смартфонов в Fast API producer и Kafka. Следующим шагом является создание процесса — консумера, который считывает данные из Kafka и что-то делает с данными. Консумеры могут нести ответственность за все, что связано с чтением данных, хранящихся в журнале, и манипулированием ими. Для этого проекта они будут использоваться для преобразования необработанных показаний датчиков и сохранения их в базе данных временных рядов, известной как QuestDB. Вот структура каталогов для консумера:

|-db_consumer  | |-app  | | |-core  | | | |-config.py  | | |-models  | | | |-sensors.py  | | |-db  | | | |-ingress.py  | | |-main.py  | |-requirements.txt  | |-Dockerfile  | |-entrypoint.sh

Перед созданием консумера нам нужно создать экземпляр Quest DB. Quest DB — это высокопроизводительная база данных временных рядов с открытым исходным кодом и API, совместимым с Postgres. Это означает, что мы можем запрашивать таблицы Quest DB так, как если бы они были таблицами Postgres, ориентированными на строки. При этом  мы пользуемся преимуществами таблиц, ориентированных на столбцы. Мы можем запустить QuestDB с помощью docker:

questdb:     image: questdb/questdb     container_name: questdb     restart: always     expose:       - 9000       - 9009       - 9003     ports:       - 8812:8812     volumes:       - ./questdb:/root/.questdb     environment:       - QDB_LOG_W_STDOUT_LEVEL=ERROR       - QDB_LOG_W_FILE_LEVEL=ERROR       - QDB_LOG_W_HTTP_MIN_LEVEL=ERROR       - QDB_SHARED_WORKER_COUNT=2       - QDB_PG_USER=${DB_USER}       - QDB_PG_PASSWORD=${DB_PASSWORD}       - QDB_TELEMETRY_ENABLED=false       - QDB_CAIRO_SQL_COPY_ROOT=./

Обратите внимание в строках 5-8, что мы открываем порты 9000, 9009 и 9003. Эти порты, в частности порт 9000, используются для записи данных в таблицы QuestDB. Включив эти порты в раздел expose, а не в раздел ports, мы гарантируем, что только контейнеры, работающие в одной сети Docker, могут записывать данные. Порт 8812 доступен за пределами сети Docker и используется для запроса данных. Переменные окружения QDB_PG_USER и QDB_PG_PASSWORD, наряду с другими переменными, связанными с QuestDB, задаются в файле .env:

# Questdb config  DB_USER =admin  DB_PASSWORD =quest  DB_HOST =questdb  DB_PORT = 8812  DB_IMP_PORT = 9000  DB_NAME =qdb  DB_TRIAXIAL_OFFLOAD_TABLE_NAME =device_ off load

Управляющий код потребителя находится в main.py :

import asyncio import json from aiokafka import AIOKafkaConsumer from core.config import app_config from db.ingress import (create_connection,                         create_triaxial_table,                         write_sensor_payloads)   async def consume_messages() -> None:      """     Coroutine to consume smart phone sensor messages from a kafka topic     """      # Create a QuestDB connection     connection = create_connection(host=app_config.DB_HOST,                                    port=app_config.DB_PORT,                                    user_name=app_config.DB_USER,                                    password=app_config.DB_PASSWORD,                                    database=app_config.DB_NAME)          # Instantiate the event loop and consumer     loop = asyncio.get_event_loop()     consumer = AIOKafkaConsumer(         app_config.TOPIC_NAME,         loop=loop,         client_id='all',         bootstrap_servers=app_config.KAFKA_URL,         enable_auto_commit=False,     )      await consumer.start()     try:         async for msg in consumer:             print(msg.value)             print('################')             # Format each message in the log and write to QuestDB             write_sensor_payloads(json.loads(msg.value), app_config.DB_IMP_URL, app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME)     finally:         await consumer.stop()         connection.close()  async def main():      await consume_messages()  if __name__ == "__main__":      # Create the table to store triaxial sensor data if it doesn't exist     connection = create_connection(host=app_config.DB_HOST,                                     port=app_config.DB_PORT,                                     user_name=app_config.DB_USER,                                     password=app_config.DB_PASSWORD,                                     database=app_config.DB_NAME)      create_triaxial_table(app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME, connection)      asyncio.run(main())

Здесь многое нужно распаковать, но основная логика изложена в строках 35-39. Консумер асинхронно перебирает сообщения в указанной теме Kafka. Этот цикл будет непрерывно обрабатывать сообщения до тех пор, пока тема обновляется. Сообщения форматируются и записываются в таблицу Quest DB с помощью следующей функции:

def write_triaxial_sensor_data(data:dict, server_url:str, table_name:str):          """     Write triaxial phone sensor data to database tables     Parameters     ----------     data : dict         The raw request data sent by the phone     server_url : str         The URL where sensor data will be written to     table_name : str         The name of the table to write to      """      session_id = data['sessionId']     device_id = data['deviceId']      # Create an empty dict to store structured sensor from the payload     structured_payload = {'device_id':[],                             'session_id':[],                             'device_timestamp':[],                             'recorded_timestamp':[],                             'sensor_name':[],                             'x':[],                             'y':[],                             'z':[]                             }          for d in data['payload']:          # Triaxial sensors         if d.get("name") in DEVICE_TO_DB_SENSOR_NAME.keys():              structured_payload['device_id'].append(device_id)             structured_payload['session_id'].append(session_id)             structured_payload['device_timestamp'].append(str(datetime.fromtimestamp(int(d["time"]) / 1000000000)))             structured_payload['recorded_timestamp'].append(str(datetime.utcnow()))             structured_payload['sensor_name'].append(DEVICE_TO_DB_SENSOR_NAME.get(d.get("name")))             structured_payload['x'].append(d["values"]["x"])             structured_payload['y'].append(d["values"]["y"])             structured_payload['z'].append(d["values"]["z"])        output = StringIO()     pd.DataFrame(structured_payload).to_csv(output, sep=',', header=True, index=False)     output.seek(0)     contents = output.getvalue()     csv = {'data': (table_name, contents)}     response = requests.post(server_url, files=csv)

Вся полезная нагрузка форматируется и сохраняется в виде CSV-файла в памяти с помощью StringIO. Оттуда CSV отправляется через POST-запрос на порт записи Quest DB. Это облегчает быструю запись всей полезной нагрузки в QuestDB с использованием одного подключения и запроса.

Таблица, в которой хранятся данные датчиков, предназначена для обеспечения баланса между быстрой записью и быстрым чтением. Вот запрос для создания таблицы в QuestDB:

СОЗДАЙТЕ  ТАБЛИЦУ , ЕСЛИ НЕ  СУЩЕСТВУЕТ device_offload (      device_id TEXT,      session_id TEXT,      device_timestamp TEXT,      record_timestamp TEXT,      sensor_name TEXT,      x REAL ,      y REAL ,      z REAL  ) 

Поля device_id и session_id берутся непосредственно из первых двух записей необработанной полезной нагрузки, как обсуждалось ранее. device_timestamp — это время, когда на устройстве была собрана отдельная выборка данных датчика, в то время как recorded_timestamp — это время, когда выборка попала в базу данных. Благодаря этому мы можем измерить, сколько времени требуется, чтобы выборка данных попала с устройства в базу данных.

Поскольку мы работаем только с трехосными датчиками, мы можем сохранить их значения в полях x, y и z и указать, какому датчику принадлежит каждый образец, в поле sensor_name. Эта схема позволяет нам записывать данные с каждого датчика в полезной нагрузке в одну и ту же таблицу за одну запись, в отличие от записи в несколько таблиц, требующих многократной записи.

Важно отметить, что в реальных условиях эта таблица QuestDB, скорее всего, не будет конечным местом хранения данных. Вместо этого таблица будет действовать как буфер, позволяя приложениям легко получать доступ к данным в структурированном формате. Данные высокочастотных датчиков, в нашем случае 50 Гц, быстро растут, и их становится трудно поддерживать. Скорее всего, мы бы внедрили еще один конвеер Kafka, отвечающий за перемещение старых данных из QuestDB в архив.

Последним шагом для этого консумера является добавление соответствующих команд docker-compose:

db_consumer:     build:       context: ./db_consumer       dockerfile: Dockerfile     command: python main.py     env_file:       - .env     depends_on:       - kafka       - zookeeper

Дэшборд

У нас есть все для визуализации данных датчика в том виде, в каком они записаны в QuestDB. Чтобы сделать это, нам нужно создать другое приложение Fast API, которое опрашивает базу данных и использует события, отправляемые сервером (SSE), для обновления HTML-страницы. Вот последняя структура каталогов, которую нужно изучить:

|-ui_server  | |-app  | | |-core  | | | |-config.py  | | |-models  | | | |-sensors.py  | | |-static  | | | |-js  | | | | |-main.js  | | |-db  | | | |-data_api.py  | | |-templates  | | | |-index.html  | | |-main.py  | |-requirements.txt  | |-Dockerfile  | |-entrypoint.sh

Как и прежде, main.py — драйвер для этого приложения:

import asyncio import json import logging import sys from fastapi import FastAPI from fastapi.requests import Request from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from sse_starlette.sse import EventSourceResponse from core.config import app_config from db.data_api import create_connection, get_recent_triaxial_data, DEVICE_TO_DB_SENSOR_NAME from models.sensors import SensorName   CONNECTION = create_connection(app_config.DB_HOST,                                app_config.DB_PORT,                                app_config.DB_USER,                                app_config.DB_PASSWORD,                                app_config.DB_NAME )  logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__)  app = FastAPI() origins = [    f"http://localhost:{app_config.UI_PORT}",    f"http://127.0.0.1:{app_config.UI_PORT}",    f"http://0.0.0.0:{app_config.UI_PORT}" ]  app.add_middleware(     CORSMiddleware,     allow_origins=origins,     allow_credentials=True,     allow_methods=["*"],     allow_headers=["*"], )  app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") templates = Jinja2Templates(directory="templates")  @app.get("/", response_class=HTMLResponse) async def index(request: Request) -> templates.TemplateResponse:     return templates.TemplateResponse("index.html", {"request": request})   @app.get('/chart-data') async def message_stream(request: Request):     def new_messages():         yield True     async def event_generator():         while True:                         if await request.is_disconnected():                 break                              if new_messages():                  data = get_recent_triaxial_data(connection=CONNECTION,                                                  table_name=app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME,                                                 sensor_name=DEVICE_TO_DB_SENSOR_NAME[SensorName.ACC.value],                                                 sample_rate=app_config.PHONE_SAMPLE_RATE,                                                 num_seconds=1,                                                 max_lookback_seconds=60)                                  message_data = {}                  for device_id in data['device_id'].unique():                      data_device = data[data['device_id']==device_id]                      message_data[device_id] = {                                     'time':[t[11:] for t in list(data_device['recorded_timestamp'].astype(str).values)],                                     'x':list(data_device['x'].astype(float).values),                                     'y':list(data_device['y'].astype(float).values),                                     'z':list(data_device['z'].astype(float).values)                                 }                  message = json.dumps(message_data)                 yield {                         "event": "new_message",                         "id": "message_id",                         "retry":1500000,                         "data": message                 }              await asyncio.sleep(0.1)      return EventSourceResponse(event_generator())

Каждые 0,1 секунды, строка 90, функция message_stream запрашивает у базы данных самую последнюю секунду показаний датчика, строка 62. В этой итерации панели мониторинга запрашиваются и отображаются только данные акселерометра. Аргументу max_lookback_seconds присвоено значение 60 — это означает, что все телефоны, которые не отправляли данные за последние 60 секунд, будут отфильтрованы в запросе. Следовательно, на этой панели мониторинга будут отображаться данные акселерометра за последнюю секунду для всех телефонов, отправивших данные за последнюю минуту. Вот логика запроса:

def get_recent_triaxial_data(connection:pg.connect,                              table_name:str,                              sensor_name:str,                              sample_rate:int,                              num_seconds:float,                              max_lookback_seconds:float):      """     Query the most recent data from a triaxial smartphone sensor.     Parameters     ----------     connection:pg.connect         A postgres connection object     table_name:str         The table where the sensor data is stored     sensor_name:str         The name of the sensor to query     sample_rate:int         The sampling rate of the sensor (in hz)     num_seconds:float         The number of seconds of data to pull     max_lookback_seconds:float         The maximum amount seconds to look for data from.         For instance, if a device stopped producing data         10 seconds ago, and max_lookback_seconds = 10,         then data for this device will be ignored.     Returns     -------     A DataFrame with the requested sensor data     """      # The number of samples to get     num_samples:int = int(sample_rate*num_seconds)      query:str = f"""with tmp as (select device_id,                                        recorded_timestamp,                                        x,                                        y,                                        z,                                        row_number() over(partition by device_id order by                                                         recorded_timestamp desc) as rn                                         from {table_name}                                         where sensor_name = '{sensor_name}'                                         and recorded_timestamp::timestamp >= dateadd('s', -{int(max_lookback_seconds)}, now())                                )                               select * from tmp                                where rn <= {num_samples}                               """      return pd.read_sql(query, connection)

Добавьте необходимые строки в файл docker-compose:

ui_server:     build:       context: ./ui_server       dockerfile: Dockerfile     command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 5000     ports:       - 5000:5000     env_file:       - .env     depends_on:       - db_consumer

И панель мониторинга должна быть доступна по адресу http://localhost:5000:

В этой статье представлен общий обзор проекта потоковой передачи в реальном времени с источником данных, к которому имеет доступ большинство людей (смартфоны). Хотя здесь много движущихся частей, мы просто заглянули в мир потоковой передачи данных.

https://slurm.club/3HmEXDx

Но о Kafka можно узнать больше.

Углублённый курс с практикой на Java или Golang и платформой Spring+Docker+Postgres переведёт вас на новый уровень владения инструментом.

На курсе «Apache Kafka для разработчиков» мы обсудим:

  • неправильное использование Кафка и отсутствие коммитов в ней;

  • ваши кейсы о проблемах при работе с Apache Kafka;

  • опыт создания Data Lake на ~80 ТБ с помощью Apache Kafka;

  • особенности эксплуатации kafka с retention в 99999999.

Старт потока — 12 мая 2023. Присоединяйтесь кобучению сейчас ? https://slurm.club/3HmEXDx

Полезные ссылки для изучения:

  1. Apache Kafka

  2. Event-Driven Architectures — The Queue vs The Log

  3. Lucidchart

  4. Kafka Poc using FastApi

  5. geo-stream-kafka

  6. 18 Most Popular IoT Devices in 2022

  7. FastAPI

  8. QuestDB

  9. Row vs Column Oriented Databases


ссылка на оригинал статьи https://habr.com/ru/articles/730380/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *