Эта статья является живым очерком о буднях современного тестировщика в agile-команде и рассматривает подход в тестировании, при котором описывается вариант решения проблемы настройки и разворачивания тестируемого приложения. В качестве примера возьмем сервис для поиска данных во временных рядах — Time Series Pattern, разрабатываемый нами на аутсорс.
Меня зовут Роман Мостафин, и я работаю QA-инженером в компании Factory5.
Начинаем.
На данный момент многие команды сталкиваются с тем, что сложно обслуживать среду для тестирования в виде стендов, а также с многопользовательским доступом к одним и тем же данным и версионирование этих стендов. Порою эта проблема стоит довольно остро. Решением может быть поднятие сервисов и части систем на локальной машине. К его плюсам относится: легкость версионирования, быстрота разворачивания и настройки. Данное решение применяется в нашей компании для тестирования отдельных сервисов. И сейчас я расскажу, как мы его используем на примере нашего сервиса разрабатываемого на аутсорсе.
Установка сервисов
Для начала нам понадобится утилита Docker — система контейнеризации и виртуализации.
Команда сделала сборку в виде докер-файла, в которую входят следующие сервисы:
-
Kafka
-
Kafdrop
-
Zookeeper
-
Kafka UI
-
Clickhouse
-
Flink-jobmanager
-
Flink-taskmanager
-
TSP
Создадим директорию на локальной машине и перенесем туда docker-compose файл.

Затем мы воспользуемся терминалом и перейдем в данную директорию.

Пока поднимается данная сборка, расскажу про возможности самого сервиса.
TSP (Time Series Patterns) — аналитический сервис для поиска шаблонов в данных временных рядов большого объема. Имеет гибкий DSL для описания паттернов (правил) и представляет собой оптимизированный движок потоковой обработки поверх Apache Flink. Этот сервис может применяться для потоковой обработки данных, а также для частичной обработки данных.
Принцип работы такой: есть исходные данные, на них пишутся правила (определенные условия поиска данных). Далее эти правила применяются на данных с помощью запроса, в котором мы указываем адреса источников данных, диапазон данных и условие проверки. Как итог, мы получаем запись о том, что найдено в соответствии правилу на определенном отрезке данных. Для запуска потоковой обработки (stream) используется kafka в качестве источника данных и хранилища инцидентов. Для запуска обработки данных по частям (batch) в качестве хранилища данных используется база данных clickhouse. В этой статье мы рассмотрим оба варианта запуска сервиса.
После того как будет доступен адрес http://localhost:8081/ на локальной машине, можно приступать к настройке.

Настройка запуска в режиме batch
Начнем с настройки режима с запуском правил на части данных. Для этого нам понадобится клиент базы данных. Я покажу на примере клиента DataGrip https://www.jetbrains.com/ru-ru/datagrip/.
Создаем соединение со стандартными настройками clickhouse (username = default, host = localhost, database = default).

Открываем консоль и создаем две таблицы.
Таблицу для исходных данных (телеметрии):
create table if not exists test_telemetry ( dt DateTime default toDateTime(ts), ts Float64, sensor_id String, n UInt32, value_float Float32, equipment_id UInt32, engine_no String ) engine = ReplacingMergeTree() PARTITION BY toYYYYMM(dt) ORDER BY (equipment_id, sensor_id, n, ts) SETTINGS index_granularity = 8192;
И таблицу для хранения инцидентов:
create table if not exists events_v2_event ( uuid UUID default generateUUIDv4(), date_from DateTime64(6) default '0000000000.000000', date_to DateTime64(6) default '0000000000.000000', processing_date DateTime64(6) default now(), type_id UInt16, equipment_id UInt32, ph_node_id UInt32, algorithm_id UInt32, value Nullable(Float32), context Nullable(String), status UInt8 default 1, duration Float64 materialized (toUnixTimestamp64Milli(date_to) - toUnixTimestamp64Milli(date_from)) / 1000 ) engine = MergeTree() PARTITION BY (toYYYYMM(date_from), intDiv(equipment_id, 1000)) ORDER BY date_from SETTINGS index_granularity = 8192;
Далее заполним тестовыми данными таблицу телеметрии с помощью следующих запросов:
INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor_2', 1, 10, 1, '1'); default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor_2', 1, 10, 1, '1'); INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor_2', 1, 10, 1, '1'); INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor_1', 1, 10, 1, '1'); INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor_1', 1, 10, 1, '1'); INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor_1', 1, 10, 1, '1'); INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor', 1, 10, 1, '1'); INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor', 1, 10, 1, '1'); INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor', 1, 10, 1, '1');
Теперь мы проверим работоспособность нашей системы выполнив REST запрос в сам TSP.
Его можно выполнить, используя клиент для Rest. Я использую POSTMAN
Url: http://localhost:8080/streamJob/from-jdbc/to-jdbc/?run_async=1
Body:
{ "sink": { "jdbcUrl": "jdbc:clickhouse://default:@clickhouse:8123/default", "rowSchema": { "toTsField": "date_to", "fromTsField": "date_from", "unitIdField": "equipment_id", "appIdFieldVal": [ "type_id", 1 ], "patternIdField": "algorithm_id", "subunitIdField": "ph_node_id" }, "tableName": "events_v2_event", "driverName": "ru.yandex.clickhouse.ClickHouseDriver" }, "uuid": "59338fc6-087a-4f49-a6f8-f0f96cc31bcd_1", "source": { "query": "SELECT ts, equipment_id AS equipment_id, sensor_id, value_float \nFROM test_telemetry \nWHERE equipment_id IN (1) AND ts >= 1635157169.0 AND ts < 1635157171.0 ORDER BY ts ASC", "jdbcUrl": "jdbc:clickhouse://default:@clickhouse:8123/default", "sourceId": 1, "driverName": "ru.yandex.clickhouse.ClickHouseDriver", "unitIdField": "equipment_id", "datetimeField": "ts", "partitionFields": [ "equipment_id" ], "dataTransformation": { "type": "NarrowDataUnfolding", "config": { "keyColumn": "sensor_id", "defaultTimeout": 60000, "fieldsTimeoutsMs": {}, "defaultValueColumn": "value_float" } } }, "patterns": [ { "id": "1", "payload": { "subunit": "4" }, "sourceCode": "\"test_sensor\" = 10 andThen (\"test_sensor_1\" = 20 or \"test_sensor_2\" = 10)", "forwardedFields": [ "equipment_id" ] } ] }

После получения успешного ответа мы переходим в ui-флинк и ждем смены статуса job с running на finished.

Далее мы переходим в базу данных, делаем запрос к таблице: select * from events_v2_event; и получаем строчку данных, которая соответствует условию:

Настройка запуска в режиме stream
Рассмотрим пример запуска в потоке. Различие в том, что запущенная задача не завершается, а ждет новой порции данных в топике, указанном в запросе.
Для начала мы идем в kafka ui и создаем два топика:
топик для исходных данных, в данном примере название “test” и топик для инцидентов — events_v2_event.
Пример:


Далее с помощью python скрипта и csv файла с тестовыми данными мы заполняем данными топик test.
Для этого мы создаем файл test_telemetry.csv со следующими данными:
"dt","ts","sensor_id","n","value_float","equipment_id","engine_no" 2021-10-25 13:23:21,1635157169,"test_sensor_2",1,10.0,1,"1" 2021-10-25 13:23:21,1635157170,"test_sensor_2",1,10.0, 1,"1" 2021-10-25 13:23:23,1635157171,"test_sensor_2",1,10.0,1,"1" 2021-10-25 13:23:21,1635157169,"test_sensor_1",1,10.0,1,"1" 2021-10-25 13:23:21,1635157170,"test_sensor_1",1,10.0,1,"1" 2021-10-25 13:23:23,1635157171,"test_sensor_1",1,10.0,1,"1" 2021-10-25 13:23:21,1635157169,"test_sensor",1,10.0,1,"1" 2021-10-25 13:23:21,1635157170,"test_sensor",1,10.0,1,"1" 2021-10-25 13:23:23,1635157171,"test_sensor",1,10.0,1,"1"
Далее мы создаем python-file со следующим содержимым:
import pandas as pd from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='10.83.0.3:9092') df = pd.read_csv('test_telemetry.csv') df.apply(lambda x: producer.send(topic='test',value=x.to_json().encode('utf-8')), axis=1)
Устанавливаем python >= 3.8, виртуальное окружение и библиотеку kafka-python.
И запускаем сам скрипт для того, чтобы перенести эти данные в топик.
И теперь мы можем отправить запрос на запуск task со следующим body и url.
Url: http://localhost:8080/streamJob/from-kafka/to-kafka/?run_async=1
Body:
{ "sink": { "topic": "events_v2_event", "broker": "10.83.0.3:9092", "rowSchema": { "toTsField": "date_to", "fromTsField": "date_from", "unitIdField": "equipment_id", "appIdFieldVal": [ "type_id", 1 ], "patternIdField": "algorithm_id", "subunitIdField": "ph_node_id", "incidentIdField": "uuid" }, "parallelism": 1, "datetimeField": "ts", "eventsMaxGapMs": 60000, "defaultEventsGapMs": 2000, "numParallelSources": 1, "patternsParallelism": 1 }, "uuid": "test_status", "source": { "group": "local_1", "topic": "test", "brokers": "10.83.0.3:9092", "sourceId": 1, "fieldsTypes": { "ts": "float64", "sensor_id": "String", "n": "int32", "value_float": "float32", "equipment_id": "int32", "engine_no": "String" }, "unitIdField": "equipment_id", "datetimeField": "ts", "partitionFields": [ "equipment_id" ], "dataTransformation": { "type": "NarrowDataUnfolding", "config": { "keyColumn": "sensor_id", "defaultTimeout": 60000, "fieldsTimeoutsMs": {}, "defaultValueColumn": "value_float", "valueColumnMapping": {} } } }, "patterns": [ { "id": "1", "subunit": 2, "sourceCode": "\"test_sensor\" = 10 andThen (\"test_sensor_1\" = 20 or \"test_sensor_2\" = 10)", "forwardedFields": [ "equipment_id" ] } ] }
Подождав некоторое время, мы можем перейти в топик с инцидентами и обнаружим там запись об инциденте:

Итоги:
На примере настройки и установки сервиса через docker мы рассмотрели решение проблемы тестирования приложений на локальной машине и убедились, что это возможно при наличии определенных знаний по системам виртуализации. Конечно, тестирование в реальной среде (в кластере) имеет свои особенности, но большую часть функционала можно проверить именно на локальной машине. Я намеренно упустил шаги по созданию самой конфигурации, так как на просторах интернета есть множество статей как это сделать.
Всем спасибо за внимание!
ссылка на оригинал статьи https://habr.com/ru/company/factory5/blog/648855/
Добавить комментарий