Сказ о некластерном тестировании сервисов

от автора

Эта статья является живым очерком о буднях современного тестировщика в agile-команде и рассматривает подход в тестировании, при котором описывается вариант решения проблемы настройки и разворачивания тестируемого приложения. В качестве примера возьмем сервис для поиска данных во временных рядах — Time Series Pattern, разрабатываемый нами на аутсорс. 

Меня зовут Роман Мостафин, и я работаю QA-инженером в компании Factory5. 
Начинаем. 

На данный момент многие команды сталкиваются с тем, что сложно обслуживать среду для тестирования в виде стендов, а также с многопользовательским доступом к одним и тем же данным и версионирование этих стендов. Порою эта проблема стоит довольно остро. Решением может быть поднятие сервисов и части систем на локальной машине. К его плюсам относится: легкость версионирования, быстрота разворачивания и настройки. Данное решение применяется в нашей компании для тестирования отдельных сервисов. И сейчас я расскажу, как мы его используем на примере нашего сервиса разрабатываемого на аутсорсе.  

Установка сервисов 

Для начала нам понадобится утилита Docker — система контейнеризации и виртуализации. 

Команда сделала сборку в виде докер-файла, в которую входят следующие сервисы: 

  • Kafka 

  • Kafdrop 

  • Zookeeper 

  • Kafka UI 

  • Clickhouse 

  • Flink-jobmanager 

  • Flink-taskmanager 

  • TSP 

Создадим директорию на локальной машине и перенесем туда docker-compose файл.

Затем мы воспользуемся терминалом и перейдем в данную директорию. 

Пока поднимается данная сборка, расскажу про возможности самого сервиса. 

TSP (Time Series Patterns) — аналитический сервис для поиска шаблонов в данных временных рядов большого объема. Имеет гибкий DSL для описания паттернов (правил) и представляет собой оптимизированный движок потоковой обработки поверх Apache Flink. Этот сервис может применяться для потоковой обработки данных, а также для частичной обработки данных. 

Принцип работы такой: есть исходные данные, на них пишутся правила (определенные условия поиска данных). Далее эти правила применяются на данных с помощью запроса, в котором мы указываем адреса источников данных, диапазон данных и условие проверки. Как итог, мы получаем запись о том, что найдено в соответствии правилу на определенном отрезке данных. Для запуска потоковой обработки (stream) используется kafka в качестве источника данных и хранилища инцидентов. Для запуска обработки данных по частям (batch) в качестве хранилища данных используется база данных clickhouse. В этой статье мы рассмотрим оба варианта запуска сервиса. 

После того как будет доступен адрес http://localhost:8081/ на локальной машине, можно приступать к настройке. 

Настройка запуска в режиме batch 

Начнем с настройки режима с запуском правил на части данных. Для этого нам понадобится клиент базы данных. Я покажу на примере клиента DataGrip https://www.jetbrains.com/ru-ru/datagrip/. 

Создаем соединение со стандартными настройками clickhouse (username = default, host = localhost, database = default).

Открываем консоль и создаем две таблицы. 

Таблицу для исходных данных (телеметрии): 

create table if not exists test_telemetry   (   dt DateTime default toDateTime(ts),   ts Float64,   sensor_id String,   n UInt32,   value_float Float32,   equipment_id UInt32,   engine_no String   )   engine = ReplacingMergeTree()   PARTITION BY toYYYYMM(dt)   ORDER BY (equipment_id, sensor_id, n, ts)   SETTINGS index_granularity = 8192; 

И таблицу для хранения инцидентов: 

create table if not exists events_v2_event   (   uuid UUID default generateUUIDv4(),   date_from DateTime64(6) default '0000000000.000000',   date_to DateTime64(6) default '0000000000.000000',   processing_date DateTime64(6) default now(),   type_id UInt16,   equipment_id UInt32,   ph_node_id UInt32,   algorithm_id UInt32,   value Nullable(Float32),   context Nullable(String),   status UInt8 default 1,   duration Float64 materialized (toUnixTimestamp64Milli(date_to) - toUnixTimestamp64Milli(date_from)) / 1000   )   engine = MergeTree()   PARTITION BY (toYYYYMM(date_from), intDiv(equipment_id, 1000))   ORDER BY date_from   SETTINGS index_granularity = 8192; 

Далее заполним тестовыми данными таблицу телеметрии с помощью следующих запросов: 

INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor_2', 1, 10, 1, '1');   default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor_2', 1, 10, 1, '1');   INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor_2', 1, 10, 1, '1');   INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor_1', 1, 10, 1, '1');   INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor_1', 1, 10, 1, '1');   INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor_1', 1, 10, 1, '1');   INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157169, 'test_sensor', 1, 10, 1, '1');   INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:21', 1635157170, 'test_sensor', 1, 10, 1, '1');   INSERT INTO default.test_telemetry (dt, ts, sensor_id, n, value_float, equipment_id, engine_no) VALUES ('2021-10-25 13:23:23', 1635157171, 'test_sensor', 1, 10, 1, '1'); 

Теперь мы проверим работоспособность нашей системы выполнив REST запрос в сам TSP. 

Его можно выполнить, используя клиент для Rest. Я использую POSTMAN 

Url: http://localhost:8080/streamJob/from-jdbc/to-jdbc/?run_async=1  

Body: 

{ "sink": {   "jdbcUrl": "jdbc:clickhouse://default:@clickhouse:8123/default",   "rowSchema": {   "toTsField": "date_to",   "fromTsField": "date_from",   "unitIdField": "equipment_id",   "appIdFieldVal": [   "type_id",   1   ],   "patternIdField": "algorithm_id",   "subunitIdField": "ph_node_id"   },   "tableName": "events_v2_event",   "driverName": "ru.yandex.clickhouse.ClickHouseDriver"   },   "uuid": "59338fc6-087a-4f49-a6f8-f0f96cc31bcd_1",   "source": {   "query": "SELECT ts, equipment_id AS equipment_id, sensor_id, value_float \nFROM test_telemetry \nWHERE equipment_id IN (1) AND ts >= 1635157169.0 AND ts < 1635157171.0 ORDER BY ts ASC",   "jdbcUrl": "jdbc:clickhouse://default:@clickhouse:8123/default",   "sourceId": 1,   "driverName": "ru.yandex.clickhouse.ClickHouseDriver",   "unitIdField": "equipment_id",   "datetimeField": "ts",   "partitionFields": [   "equipment_id"   ],   "dataTransformation": {   "type": "NarrowDataUnfolding",   "config": {   "keyColumn": "sensor_id",   "defaultTimeout": 60000,   "fieldsTimeoutsMs": {},   "defaultValueColumn": "value_float"   }   }   },   "patterns": [   {   "id": "1",   "payload": {   "subunit": "4"   },   "sourceCode": "\"test_sensor\" = 10 andThen (\"test_sensor_1\" = 20 or \"test_sensor_2\" = 10)",   "forwardedFields": [   "equipment_id"   ]   }   ]   } 

После получения успешного ответа мы переходим в ui-флинк и ждем смены статуса job с running на finished.

Далее мы переходим в базу данных, делаем запрос к таблице: select * from events_v2_event; и получаем строчку данных, которая соответствует условию:

Настройка запуска в режиме stream 

Рассмотрим пример запуска в потоке. Различие в том, что запущенная задача не завершается, а ждет новой порции данных в топике, указанном в запросе. 

Для начала мы идем в kafka ui и создаем два топика: 

топик для исходных данных, в данном примере название “test” и топик для инцидентов — events_v2_event. 

Пример: 

Далее с помощью python скрипта и csv файла с тестовыми данными мы заполняем данными топик test. 

Для этого мы создаем файл test_telemetry.csv со следующими данными: 

"dt","ts","sensor_id","n","value_float","equipment_id","engine_no"  2021-10-25 13:23:21,1635157169,"test_sensor_2",1,10.0,1,"1"  2021-10-25 13:23:21,1635157170,"test_sensor_2",1,10.0, 1,"1"  2021-10-25 13:23:23,1635157171,"test_sensor_2",1,10.0,1,"1"  2021-10-25 13:23:21,1635157169,"test_sensor_1",1,10.0,1,"1"  2021-10-25 13:23:21,1635157170,"test_sensor_1",1,10.0,1,"1"  2021-10-25 13:23:23,1635157171,"test_sensor_1",1,10.0,1,"1"  2021-10-25 13:23:21,1635157169,"test_sensor",1,10.0,1,"1"  2021-10-25 13:23:21,1635157170,"test_sensor",1,10.0,1,"1"  2021-10-25 13:23:23,1635157171,"test_sensor",1,10.0,1,"1" 

Далее мы создаем python-file со следующим содержимым: 

import pandas as pd  from kafka import KafkaProducer  producer = KafkaProducer(bootstrap_servers='10.83.0.3:9092')    df = pd.read_csv('test_telemetry.csv')    df.apply(lambda x: producer.send(topic='test',value=x.to_json().encode('utf-8')), axis=1)   

Устанавливаем python >= 3.8, виртуальное окружение и библиотеку kafka-python. 

И запускаем сам скрипт для того, чтобы перенести эти данные в топик.  

 И теперь мы можем отправить запрос на запуск task со следующим body и url.

Url: http://localhost:8080/streamJob/from-kafka/to-kafka/?run_async=1 

Body:

{                           "sink": {                               "topic": "events_v2_event",                               "broker": "10.83.0.3:9092",                               "rowSchema": {                                   "toTsField": "date_to",                                   "fromTsField": "date_from",                                   "unitIdField": "equipment_id",                                   "appIdFieldVal": [                                       "type_id",                                       1                                   ],                                   "patternIdField": "algorithm_id",                                   "subunitIdField": "ph_node_id",                                   "incidentIdField": "uuid"                               },                               "parallelism": 1,                               "datetimeField": "ts",                               "eventsMaxGapMs": 60000,                               "defaultEventsGapMs": 2000,                               "numParallelSources": 1,                               "patternsParallelism": 1                           },                           "uuid": "test_status",                           "source": {                               "group": "local_1",                               "topic": "test",                               "brokers": "10.83.0.3:9092",                               "sourceId": 1,                               "fieldsTypes": {                                   "ts": "float64",                                   "sensor_id": "String",                                   "n": "int32",                                   "value_float": "float32",                                   "equipment_id": "int32",                                   "engine_no": "String"                               },                               "unitIdField": "equipment_id",                               "datetimeField": "ts",                               "partitionFields": [                                   "equipment_id"                               ],                               "dataTransformation": {                                   "type": "NarrowDataUnfolding",                                   "config": {                                       "keyColumn": "sensor_id",                                       "defaultTimeout": 60000,                                       "fieldsTimeoutsMs": {},                                       "defaultValueColumn": "value_float",                                       "valueColumnMapping": {}                                   }                               }                           },                           "patterns": [                               {                                   "id": "1",                                   "subunit": 2,                                   "sourceCode": "\"test_sensor\" = 10 andThen (\"test_sensor_1\" = 20 or \"test_sensor_2\" = 10)",                                   "forwardedFields": [                                       "equipment_id"                                  ]                               }                           ]                       } 

Подождав некоторое время, мы можем перейти в топик с инцидентами и обнаружим там запись об инциденте:

Итоги: 

На примере настройки и установки сервиса через docker мы рассмотрели решение проблемы тестирования приложений на локальной машине и убедились, что это возможно при наличии определенных знаний по системам виртуализации. Конечно, тестирование в реальной среде (в кластере) имеет свои особенности, но большую часть функционала можно проверить именно на локальной машине. Я намеренно упустил шаги по созданию самой конфигурации, так как на просторах интернета есть множество статей как это сделать.  

Всем спасибо за внимание! 


ссылка на оригинал статьи https://habr.com/ru/company/factory5/blog/648855/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *