![](https://habrastorage.org/getpro/habr/upload_files/4c9/ac1/29b/4c9ac129b0a43310122b9425bfa6c166.png)
Девайсы повсюду. Смартфоны, холодильники, дверные звонки, часы, медицинские датчики, системы безопасности и фитнес-трекеры — все это лишь некоторые из них, которые стали обычным явлением. Они постоянно записывают потенциально высокочастотную информацию и образуют сеть, известную как «Интернет вещей», или IoT, представляя обширные источники данных.
Хотя ресурсов по этой теме достаточно, немногие приводят примеры с реальными данными, доступными любому желающему. Переходя от статьи к статье, чтобы узнать о системах, управляемых событиями, и потоковых технологиях, таких как Apache Kafka, Harrison Hoffman наткнулся на приложение для смартфонов Sensor Logger, которое позволяет пользователям передавать данные с датчиков движения на свои телефоны. Такой вариант показался идеальным способом обучения, поэтому родился проект «smartphone_sensor_stream». Этот проект использует FastAPI, Kafka, QuestDB и Docker для визуализации данных датчиков в реальном времени на информационной панели.
В этой статье мы рассмотрим все основные компоненты этого проекта на продвинутом уровне. Все необходимое для локального запуска проекта доступно на GitHub, а краткая демонстрация доступна на YouTube.
Архитектура проекта
Давайте начнем с рассмотрения архитектуры этого проекта. То есть с того, как именно данные будут поступать со смартфонов на панель мониторинга:
![](https://habrastorage.org/getpro/habr/upload_files/bc4/eb6/730/bc4eb6730bf9830402b6dd0b388808df.png)
Каждый смартфон отправляет показания датчиков акселерометра, гироскопа и магнитометра через POST-запрос в приложение FastAPI. Производитель FastAPI асинхронно записывает показания датчика в раздел Kafka в виде JSON, данные из тела запроса. Каждый объект JSON обрабатывается процессом python, консумером, а потом сохраняется в таблице Quest DB. Как только данные попадают в базу данных, они становятся доступными для любой зависимой службы или приложения. В первой части этого проекта мы будем выводить показания датчиков на панель мониторинга, используя события, отправляемые сервером (SSE).
Структура каталогов и компоновка Docker Compose
Этот проект представляет собой набор небольших сервисов, которые взаимодействуют друг с другом для получения данных со смартфонов на информационную панель. Вот структура каталогов:
|-producer | |-app | | |-core | | | |-config.py | | |-__init__.py | | |-schemas | | | |-sensors.py | | |-main.py | |-requirements.txt | |-Dockerfile | |-entrypoint.sh |-db_consumer | |-app | | |-core | | | |-config.py | | |-models | | | |-sensors.py | | |-db | | | |-ingress.py | | |-main.py | |-requirements.txt | |-Dockerfile | |-entrypoint.sh |-ui_server | |-app | | |-core | | | |-config.py | | |-models | | | |-sensors.py | | |-static | | | |-js | | | | |-main.js | | |-db | | | |-data_api.py | | |-templates | | | |-index.html | | |-main.py | |-requirements.txt | |-Dockerfile | |-entrypoint.sh |-README.md |-.gitignore |-.env |-docker-compose.yml
Мы напишем три сервиса: продюсер, консумер и пользовательский интерфейс. Каждая служба упакована с помощью Dockerfile и организована при помощи docker-compose. Docker-compose позволяет нам запускать сервисы, которые мы пишем, с внешними сервисами, Kafka, Zookeeper и QuestDB, в виде отдельных контейнеров, подключенных через внутреннюю сеть. Все, что нам нужно для организации служб в этом проекте, находится в файле docker-compose:
version: '3.8' services: zookeeper: image: bitnami/zookeeper:latest ports: - 2181:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: bitnami/kafka:latest ports: - 9092:9092 - 9093:9093 environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT depends_on: - zookeeper questdb: image: questdb/questdb container_name: questdb restart: always expose: - 9000 - 9009 - 9003 ports: - 8812:8812 volumes: - ./questdb:/root/.questdb environment: - QDB_LOG_W_STDOUT_LEVEL=ERROR - QDB_LOG_W_FILE_LEVEL=ERROR - QDB_LOG_W_HTTP_MIN_LEVEL=ERROR - QDB_SHARED_WORKER_COUNT=2 - QDB_PG_USER=${DB_USER} - QDB_PG_PASSWORD=${DB_PASSWORD} - QDB_TELEMETRY_ENABLED=false - QDB_CAIRO_SQL_COPY_ROOT=./ producer: build: context: ./producer dockerfile: Dockerfile command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 8000 ports: - 8000:8000 env_file: - .env depends_on: - kafka - zookeeper db_consumer: build: context: ./db_consumer dockerfile: Dockerfile command: python main.py env_file: - .env depends_on: - kafka - zookeeper ui_server: build: context: ./ui_server dockerfile: Dockerfile command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 5000 ports: - 5000:5000 env_file: - .env depends_on: - db_consumer kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "18080:8080" restart: always environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181 depends_on: - kafka - zookeeper volumes: zookeeper_data: driver: local kafka_data: driver: local
Обратите внимание на четыре сервиса, которые мы не пишем сами (к счастью): Zookeeper, Kafka, QuestDB и Kafka-UI. Эти сервисы работают совместно с производителем, потребителем и пользовательским интерфейсом для создания проекта. Мы рассмотрим каждый сервис в отдельности, но сначала нам нужно разобраться с источником данных.
Регистратор датчиков
Sensor Logger — это приложение для iOS и Android, которое позволяет пользователям регистрировать показания датчиков, связанных с движением, со своих смартфонов. Пользователи могут просматривать показания датчиков в режиме реального времени, экспортировать данные в виде файлов и отправлять оперативные данные на сервер по протоколу HTTP. Этот проект использует функциональность HTTP для извлечения показаний датчиков. Чтобы настроить регистратор, для начала убедитесь, что выбраны все следующие датчики:
![](https://habrastorage.org/getpro/habr/upload_files/31c/8eb/b83/31c8ebb83b23acaabb5b75b21c3fe3b7.png)
Мы будем получать показания акселерометра, гироскопа и магнитометра телефона. Далее нам нужно настроить параметры регистратора датчиков таким образом, чтобы он знал, куда именно отправлять данные:
![](https://habrastorage.org/getpro/habr/upload_files/255/1b9/aa4/2551b9aa441ec8a44842dd241f57230c.png)
Наиболее важным компонентом является обеспечение правильности “Push URL” — это конечная точка производителя FastAPI, которая принимает необработанные показания датчиков с помощью POST-запросов. Мы будем использовать наш компьютер в качестве сервера, поэтому нам нужно определить соответствующий IP-адрес. На Mac это находится в разделе Системные настройки -> Сеть.
![](https://habrastorage.org/getpro/habr/upload_files/19c/338/f34/19c338f344a8d42ae3c2550c4e0c8281.png)
Обратите внимание, что IP-адрес компьютера обычно уникален для сети WI-FI, что означает, что новый IP-адрес выделяется каждый раз, когда компьютер подключается к новой сети. Поэтому крайне важно, чтобы смартфон и главный компьютер находились в одной сети. Производитель FastAPI принимает показания датчиков на:
http://%7Byour_ip_address%7D:8000/phone-producer
Вставьте приведенный выше URL-адрес в поле «Push URL», и регистратор датчиков должен быть готов к работе!
Кафка и ZooKeeper
В этом разделе не будем вдаваться в подробности о Kafka, поскольку на платформе доступно много ресурсов. Однако можно сказать, что Kafka — это высокопроизводительный фреймворк для хранения и чтения потоковых данных. Фундаментальная структура данных Кафки — журнал. Приложения, которые записывают сообщения в журнал, называются продюсерами. В отличие от очереди, сообщения в журнале сохраняются даже после прочтения — это позволяет нескольким приложениям, известным как консумеры, читать одновременно с разных позиций.
Для простоты в этом проекте есть только один продюсер — приложение FastAPI, которое записывает необработанные показания датчиков в Kafka, и один консумер, процесс на python, который считывает сообщения из Kafka и форматирует их в базе данных. Zookeeper — это сервис, который помогает управлять различными компонентами Kafka.
Для локального запуска Kafka и Zookeeper необходимы только два образа docker:
zookeeper: image: bitnami/zookeeper:latest ports: - 2181:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: bitnami/kafka:latest ports: - 9092:9092 - 9093:9093 environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT depends_on: - zookeeper kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "18080:8080" restart: always environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181 depends_on: - kafka - zookeeper
Мы будем использовать дистрибутив Kafka и Zookeeper от Bitmani. Kafka-UIImage позволяет пользователям взаимодействовать с кластерами Kafka через веб-приложение, но не требуется для этого проекта. Сохраните приведенный выше файл docker-compose как docker-compose.yml, запустите docker-compose, и графический интерфейс, подобный следующему, должен быть доступен по адресу http://localhost:18080/.
![](https://habrastorage.org/getpro/habr/upload_files/331/949/5ed/3319495ed8782c32f8e1d73cb0de99e7.png)
Информация о брокерах, темах и консумерах будет добавляться на эту панель мониторинга по мере добавления компонентов в систему.
Продюсер
Пока что у нас есть регистратор датчиков, настроенный для отправки необработанных показаний датчиков на сервер, и экземпляр Kafka готов к приему этих показаний. Следующий шаг — создать мост между исходными данными и Kafka-продюсером. Продюсером в этом проекте является быстрое API-приложение, которое принимает данные, отправляемые со смартфонов, и записывает их в журнал Kafka. Вот макет продюсера:
|-producer | |-app | | |-core | | | |-config.py | | |-__init__.py | | |-schemas | | | |-sensors.py | | |-main.py | |-requirements.txt | |-Dockerfile | |-entrypoint.sh
Мы не будем просматривать каждый файл в каталоге продюсера, поскольку все доступно на GitHub. Вместо этого давайте взглянем на main.py
управляющий скрипт API производителя:
import json from fastapi import FastAPI import asyncio from aiokafka import AIOKafkaProducer from schemas.sensors import SensorReading, SensorResponse from core.config import app_config from loguru import logger app = FastAPI(title=app_config.PROJECT_NAME) loop = asyncio.get_event_loop() producer = AIOKafkaProducer( loop=loop, client_id=app_config.PROJECT_NAME, bootstrap_servers=app_config.KAFKA_URL ) @app.on_event("startup") async def startup_event(): await producer.start() @app.on_event("shutdown") async def shutdown_event(): await producer.stop() @app.post("/phone-producer/") async def kafka_produce(data: SensorReading): """ Produce a message containing readings from a smartphone sensor. Parameters ---------- data : SensorReading The request body containing sensor readings and metadata. Returns ------- response : SensorResponse The response body corresponding to the processed sensor readings from the request. """ await producer.send(app_config.TOPIC_NAME, json.dumps(data.dict()).encode("ascii")) response = SensorResponse( messageId=data.messageId, sessionId=data.sessionId, deviceId=data.deviceId ) logger.info(response) return response
Строка 9 создает экземпляр объекта Fast API. Строки 11-17 создают экземпляр объекта Kafka-продюсер с помощью Aiokafka. Aiokafka позволяет нам записывать сообщения в Kafka асинхронно. Это значит, что нам не нужно ждать, пока Kafka получит и обработает сообщение, в строке 45, прежде чем мы перейдем к следующей строке кода. Вместо этого Aiokafka отправляет текущее сообщение в Kafka и почти мгновенно готова выдать другое сообщение. Строки 27-55 определяют маршрут, по которому будут приниматься необработанные показания датчиков. Чтобы лучше понять это, давайте взглянем на формат тела запроса, который ожидает этот маршрут — аргумент data
):
{"messageId": 20, "sessionId": "4bf3b3b9-a241-4aaa-b1d3-c05100df9976", "deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b", "payload": [{"name": "accelerometeruncalibrated", "time": "1671406719721160400", "values": {"z": -0.9372100830078125, "y": -0.3241424560546875, "x": 0.0323486328125}}, {"name": "magnetometeruncalibrated", "time": "1671406719726579500", "values": {"z": -5061.64599609375, "y": 591.083251953125, "x": 3500.541015625}}, {"name": "gyroscopeuncalibrated", "time": "1671406719726173400", "values": {"z": -0.004710599314421415, "y": -0.013125921599566936, "x": 0.009486978873610497}}, ... ]}
Каждое тело запроса представляет собой объект JSON с записями «MessageId», «SessionID», «DeviceID» и «payload». Смартфоны однозначно идентифицируются по их «идентификатору устройства». Каждый раз, когда телефон начинает новый поток, для него создается новый «sessionId». Запись «MessageId» указывает порядок расположения сообщений в последовательности из текущего сеанса. Запись «payload» представляет собой массив объектов JSON, которые содержат показания для каждого датчика, настроенного в Sensor Logger. Каждая запись «payload» содержит имя датчика, время записи показаний по времени unix и само считывание. Мы работаем исключительно с трехосными датчиками, поэтому каждый датчик должен иметь показания «x», «y» и «z», соответствующие трем пространственным измерениям.
Маршрут FastAPI записывает необработанное тело запроса непосредственно в раздел Kafka, в строке 45, а метаданные регистрируются и возвращаются в строках 47-55. Этот маршрут доступен по адресу http://{your_ip_address}:8000/phone-producer, как описано в разделе «Регистратор датчиков» (Sensor Logger
) . Все запросы проверяются объектом Pydantic SensorReading. То есть, любой запрос, который не соответствует формату регистратора датчиков, не будет обработан маршрутом:
from pydantic import BaseModel, validator from datetime import datetime from typing import List, Dict, Union class SensorReading(BaseModel): """ Base model class for incoming requests from smartphone sensors Attributes ---------- messageId : int The identifier of a message in the current session sessionId : int The identifier of a session deviceId : int The identifier of the device sending the data payload : List[Dict[str:Union[str, int, Dict]]] The payload of the request containing sensor readings and metadata about the readings """ messageId: int sessionId: str deviceId: str payload: List[Dict[str, Union[str, int, Dict]]] class SensorResponse(BaseModel): """ Base model class for the response of the sensor request endpoint Attributes ---------- messageId : int The identifier of a message in the current session sessionId : int The identifier of a session deviceId : int The identifier of the device sending the data timestamp : str The timestamp when a sensor request was processed """ messageId: str sessionId: str deviceId: str timestamp: str = "" @validator("timestamp", pre=True, always=True) def set_datetime_utcnow(cls, v): return str(datetime.utcnow())
Конфигурация для производителя обрабатывается с помощью переменных окружения, которые считываются объектом Pedantic Base Settings
:
from pydantic import BaseSettings, validator # Load environment variables into a pydantic BaseSetting object class AppConfig(BaseSettings): PROJECT_NAME : str KAFKA_HOST : str KAFKA_PORT : str TOPIC_NAME : str KAFKA_URL : str = "" class Config: case_sensitive = True @validator("KAFKA_URL", pre=True, always=True) def set_kafka_url(cls, v, values, **kwargs): return values['KAFKA_HOST'] + ":" + values['KAFKA_PORT'] app_config = AppConfig()
Переменные окружения хранятся в файле .env:
# Kafka config PROJECT_NAME=phone_stream_producer TOPIC_NAME=raw-phone-stream KAFKA_HOST=kafka KAFKA_PORT=9092
и передаются проидюсеру в файле docker-compose, строка 9 ниже:
producer: build: context: ./producer dockerfile: Dockerfile command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 8000 ports: - 8000:8000 env_file: - .env depends_on: - kafka - zookeeper
Обратите внимание, что значение host в команде запуска равно 0.0.0.0. Это позволяет получить доступ к продюсеру по его IP-адресу с любого устройства в локальной сети.
Консумер
Теперь у нас есть инфраструктура для потоковой передачи данных датчиков со смартфонов в Fast API producer и Kafka. Следующим шагом является создание процесса — консумера, который считывает данные из Kafka и что-то делает с данными. Консумеры могут нести ответственность за все, что связано с чтением данных, хранящихся в журнале, и манипулированием ими. Для этого проекта они будут использоваться для преобразования необработанных показаний датчиков и сохранения их в базе данных временных рядов, известной как QuestDB. Вот структура каталогов для консумера:
|-db_consumer | |-app | | |-core | | | |-config.py | | |-models | | | |-sensors.py | | |-db | | | |-ingress.py | | |-main.py | |-requirements.txt | |-Dockerfile | |-entrypoint.sh
Перед созданием консумера нам нужно создать экземпляр Quest DB. Quest DB — это высокопроизводительная база данных временных рядов с открытым исходным кодом и API, совместимым с Postgres. Это означает, что мы можем запрашивать таблицы Quest DB так, как если бы они были таблицами Postgres, ориентированными на строки. При этом мы пользуемся преимуществами таблиц, ориентированных на столбцы. Мы можем запустить QuestDB с помощью docker:
questdb: image: questdb/questdb container_name: questdb restart: always expose: - 9000 - 9009 - 9003 ports: - 8812:8812 volumes: - ./questdb:/root/.questdb environment: - QDB_LOG_W_STDOUT_LEVEL=ERROR - QDB_LOG_W_FILE_LEVEL=ERROR - QDB_LOG_W_HTTP_MIN_LEVEL=ERROR - QDB_SHARED_WORKER_COUNT=2 - QDB_PG_USER=${DB_USER} - QDB_PG_PASSWORD=${DB_PASSWORD} - QDB_TELEMETRY_ENABLED=false - QDB_CAIRO_SQL_COPY_ROOT=./
Обратите внимание в строках 5-8, что мы открываем порты 9000, 9009 и 9003. Эти порты, в частности порт 9000, используются для записи данных в таблицы QuestDB. Включив эти порты в раздел expose
, а не в раздел ports
, мы гарантируем, что только контейнеры, работающие в одной сети Docker, могут записывать данные. Порт 8812 доступен за пределами сети Docker и используется для запроса данных. Переменные окружения QDB_PG_USER
и QDB_PG_PASSWORD
, наряду с другими переменными, связанными с QuestDB, задаются в файле .env
:
# Questdb config DB_USER =admin DB_PASSWORD =quest DB_HOST =questdb DB_PORT = 8812 DB_IMP_PORT = 9000 DB_NAME =qdb DB_TRIAXIAL_OFFLOAD_TABLE_NAME =device_ off load
Управляющий код потребителя находится в main.py
:
import asyncio import json from aiokafka import AIOKafkaConsumer from core.config import app_config from db.ingress import (create_connection, create_triaxial_table, write_sensor_payloads) async def consume_messages() -> None: """ Coroutine to consume smart phone sensor messages from a kafka topic """ # Create a QuestDB connection connection = create_connection(host=app_config.DB_HOST, port=app_config.DB_PORT, user_name=app_config.DB_USER, password=app_config.DB_PASSWORD, database=app_config.DB_NAME) # Instantiate the event loop and consumer loop = asyncio.get_event_loop() consumer = AIOKafkaConsumer( app_config.TOPIC_NAME, loop=loop, client_id='all', bootstrap_servers=app_config.KAFKA_URL, enable_auto_commit=False, ) await consumer.start() try: async for msg in consumer: print(msg.value) print('################') # Format each message in the log and write to QuestDB write_sensor_payloads(json.loads(msg.value), app_config.DB_IMP_URL, app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME) finally: await consumer.stop() connection.close() async def main(): await consume_messages() if __name__ == "__main__": # Create the table to store triaxial sensor data if it doesn't exist connection = create_connection(host=app_config.DB_HOST, port=app_config.DB_PORT, user_name=app_config.DB_USER, password=app_config.DB_PASSWORD, database=app_config.DB_NAME) create_triaxial_table(app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME, connection) asyncio.run(main())
Здесь многое нужно распаковать, но основная логика изложена в строках 35-39. Консумер асинхронно перебирает сообщения в указанной теме Kafka. Этот цикл будет непрерывно обрабатывать сообщения до тех пор, пока тема обновляется. Сообщения форматируются и записываются в таблицу Quest DB с помощью следующей функции:
def write_triaxial_sensor_data(data:dict, server_url:str, table_name:str): """ Write triaxial phone sensor data to database tables Parameters ---------- data : dict The raw request data sent by the phone server_url : str The URL where sensor data will be written to table_name : str The name of the table to write to """ session_id = data['sessionId'] device_id = data['deviceId'] # Create an empty dict to store structured sensor from the payload structured_payload = {'device_id':[], 'session_id':[], 'device_timestamp':[], 'recorded_timestamp':[], 'sensor_name':[], 'x':[], 'y':[], 'z':[] } for d in data['payload']: # Triaxial sensors if d.get("name") in DEVICE_TO_DB_SENSOR_NAME.keys(): structured_payload['device_id'].append(device_id) structured_payload['session_id'].append(session_id) structured_payload['device_timestamp'].append(str(datetime.fromtimestamp(int(d["time"]) / 1000000000))) structured_payload['recorded_timestamp'].append(str(datetime.utcnow())) structured_payload['sensor_name'].append(DEVICE_TO_DB_SENSOR_NAME.get(d.get("name"))) structured_payload['x'].append(d["values"]["x"]) structured_payload['y'].append(d["values"]["y"]) structured_payload['z'].append(d["values"]["z"]) output = StringIO() pd.DataFrame(structured_payload).to_csv(output, sep=',', header=True, index=False) output.seek(0) contents = output.getvalue() csv = {'data': (table_name, contents)} response = requests.post(server_url, files=csv)
Вся полезная нагрузка форматируется и сохраняется в виде CSV-файла в памяти с помощью StringIO. Оттуда CSV отправляется через POST-запрос на порт записи Quest DB. Это облегчает быструю запись всей полезной нагрузки в QuestDB с использованием одного подключения и запроса.
Таблица, в которой хранятся данные датчиков, предназначена для обеспечения баланса между быстрой записью и быстрым чтением. Вот запрос для создания таблицы в QuestDB:
СОЗДАЙТЕ ТАБЛИЦУ , ЕСЛИ НЕ СУЩЕСТВУЕТ device_offload ( device_id TEXT, session_id TEXT, device_timestamp TEXT, record_timestamp TEXT, sensor_name TEXT, x REAL , y REAL , z REAL )
Поля device_id
и session_id
берутся непосредственно из первых двух записей необработанной полезной нагрузки, как обсуждалось ранее. device_timestamp
— это время, когда на устройстве была собрана отдельная выборка данных датчика, в то время как recorded_timestamp
— это время, когда выборка попала в базу данных. Благодаря этому мы можем измерить, сколько времени требуется, чтобы выборка данных попала с устройства в базу данных.
Поскольку мы работаем только с трехосными датчиками, мы можем сохранить их значения в полях x
, y
и z
и указать, какому датчику принадлежит каждый образец, в поле sensor_name
. Эта схема позволяет нам записывать данные с каждого датчика в полезной нагрузке в одну и ту же таблицу за одну запись, в отличие от записи в несколько таблиц, требующих многократной записи.
Важно отметить, что в реальных условиях эта таблица QuestDB, скорее всего, не будет конечным местом хранения данных. Вместо этого таблица будет действовать как буфер, позволяя приложениям легко получать доступ к данным в структурированном формате. Данные высокочастотных датчиков, в нашем случае 50 Гц, быстро растут, и их становится трудно поддерживать. Скорее всего, мы бы внедрили еще один конвеер Kafka, отвечающий за перемещение старых данных из QuestDB в архив.
Последним шагом для этого консумера является добавление соответствующих команд docker-compose:
db_consumer: build: context: ./db_consumer dockerfile: Dockerfile command: python main.py env_file: - .env depends_on: - kafka - zookeeper
Дэшборд
У нас есть все для визуализации данных датчика в том виде, в каком они записаны в QuestDB. Чтобы сделать это, нам нужно создать другое приложение Fast API, которое опрашивает базу данных и использует события, отправляемые сервером (SSE), для обновления HTML-страницы. Вот последняя структура каталогов, которую нужно изучить:
|-ui_server | |-app | | |-core | | | |-config.py | | |-models | | | |-sensors.py | | |-static | | | |-js | | | | |-main.js | | |-db | | | |-data_api.py | | |-templates | | | |-index.html | | |-main.py | |-requirements.txt | |-Dockerfile | |-entrypoint.sh
Как и прежде, main.py
— драйвер для этого приложения:
import asyncio import json import logging import sys from fastapi import FastAPI from fastapi.requests import Request from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from sse_starlette.sse import EventSourceResponse from core.config import app_config from db.data_api import create_connection, get_recent_triaxial_data, DEVICE_TO_DB_SENSOR_NAME from models.sensors import SensorName CONNECTION = create_connection(app_config.DB_HOST, app_config.DB_PORT, app_config.DB_USER, app_config.DB_PASSWORD, app_config.DB_NAME ) logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) app = FastAPI() origins = [ f"http://localhost:{app_config.UI_PORT}", f"http://127.0.0.1:{app_config.UI_PORT}", f"http://0.0.0.0:{app_config.UI_PORT}" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") templates = Jinja2Templates(directory="templates") @app.get("/", response_class=HTMLResponse) async def index(request: Request) -> templates.TemplateResponse: return templates.TemplateResponse("index.html", {"request": request}) @app.get('/chart-data') async def message_stream(request: Request): def new_messages(): yield True async def event_generator(): while True: if await request.is_disconnected(): break if new_messages(): data = get_recent_triaxial_data(connection=CONNECTION, table_name=app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME, sensor_name=DEVICE_TO_DB_SENSOR_NAME[SensorName.ACC.value], sample_rate=app_config.PHONE_SAMPLE_RATE, num_seconds=1, max_lookback_seconds=60) message_data = {} for device_id in data['device_id'].unique(): data_device = data[data['device_id']==device_id] message_data[device_id] = { 'time':[t[11:] for t in list(data_device['recorded_timestamp'].astype(str).values)], 'x':list(data_device['x'].astype(float).values), 'y':list(data_device['y'].astype(float).values), 'z':list(data_device['z'].astype(float).values) } message = json.dumps(message_data) yield { "event": "new_message", "id": "message_id", "retry":1500000, "data": message } await asyncio.sleep(0.1) return EventSourceResponse(event_generator())
Каждые 0,1 секунды, строка 90, функция message_stream
запрашивает у базы данных самую последнюю секунду показаний датчика, строка 62. В этой итерации панели мониторинга запрашиваются и отображаются только данные акселерометра. Аргументу max_lookback_seconds
присвоено значение 60 — это означает, что все телефоны, которые не отправляли данные за последние 60 секунд, будут отфильтрованы в запросе. Следовательно, на этой панели мониторинга будут отображаться данные акселерометра за последнюю секунду для всех телефонов, отправивших данные за последнюю минуту. Вот логика запроса:
def get_recent_triaxial_data(connection:pg.connect, table_name:str, sensor_name:str, sample_rate:int, num_seconds:float, max_lookback_seconds:float): """ Query the most recent data from a triaxial smartphone sensor. Parameters ---------- connection:pg.connect A postgres connection object table_name:str The table where the sensor data is stored sensor_name:str The name of the sensor to query sample_rate:int The sampling rate of the sensor (in hz) num_seconds:float The number of seconds of data to pull max_lookback_seconds:float The maximum amount seconds to look for data from. For instance, if a device stopped producing data 10 seconds ago, and max_lookback_seconds = 10, then data for this device will be ignored. Returns ------- A DataFrame with the requested sensor data """ # The number of samples to get num_samples:int = int(sample_rate*num_seconds) query:str = f"""with tmp as (select device_id, recorded_timestamp, x, y, z, row_number() over(partition by device_id order by recorded_timestamp desc) as rn from {table_name} where sensor_name = '{sensor_name}' and recorded_timestamp::timestamp >= dateadd('s', -{int(max_lookback_seconds)}, now()) ) select * from tmp where rn <= {num_samples} """ return pd.read_sql(query, connection)
Добавьте необходимые строки в файл docker-compose:
ui_server: build: context: ./ui_server dockerfile: Dockerfile command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 5000 ports: - 5000:5000 env_file: - .env depends_on: - db_consumer
И панель мониторинга должна быть доступна по адресу http://localhost:5000:
![](https://habrastorage.org/getpro/habr/upload_files/dc9/d1f/2d7/dc9d1f2d724bd1850ba54766a4ccf404.png)
В этой статье представлен общий обзор проекта потоковой передачи в реальном времени с источником данных, к которому имеет доступ большинство людей (смартфоны). Хотя здесь много движущихся частей, мы просто заглянули в мир потоковой передачи данных.
![https://slurm.club/3HmEXDx https://slurm.club/3HmEXDx](https://habrastorage.org/getpro/habr/upload_files/b03/1b1/9b9/b031b19b9381e05c9f4e61e0c4a0e4e9.jpg)
Но о Kafka можно узнать больше.
Углублённый курс с практикой на Java или Golang и платформой Spring+Docker+Postgres переведёт вас на новый уровень владения инструментом.
На курсе «Apache Kafka для разработчиков» мы обсудим:
-
неправильное использование Кафка и отсутствие коммитов в ней;
-
ваши кейсы о проблемах при работе с Apache Kafka;
-
опыт создания Data Lake на ~80 ТБ с помощью Apache Kafka;
-
особенности эксплуатации kafka с retention в 99999999.
Старт потока — 12 мая 2023. Присоединяйтесь кобучению сейчас ? https://slurm.club/3HmEXDx
Полезные ссылки для изучения:
ссылка на оригинал статьи https://habr.com/ru/articles/730380/
Добавить комментарий