Микросервисный фреймворк Flogo — собираем систему из кирпичиков

от автора

Микросервисная архитектура, предполагающая разделение информационной системы на небольшие самостоятельные части, взаимодействующие посредством сетевых протоколов (чаще всего HTTP с пересылкой JSON-сообщений или через gRPC) имеет особое значение для создания масштабируемых отказоустойчивых приложений. Для реализации микросервисов существует множество библиотек и фреймворков, созданных для всех актуальных технологий разработки (исчерпывающий обновляемый список можно посмотреть например здесь).

Для разработчиков доступны как библиотеки реализации протоколов и веб-серверов, так и полноценные платформы, обеспечивающие не только механизмы обнаружения микросервисов, но и поддержку протоколирования и отслеживания цепочки запросов, контроля доступа, мониторинга доступности и производительности, маршрутизацию запросов и многое другое.

Но особый интерес для разработки микросервисов представляют фреймворки, формирующие среду выполнения (и иногда и инструменты конфигурирования) для быстрого развертывания кода, основанного на модели реакции на внешние и внутренние события. Например, среди известных фреймворков можно назвать Akka (для запуска акторов на JVM), Oracle Helidon (предоставляет большой набор вспомогательных сервисов для мониторинга, контроля доступа и поддержки различных протоколов) и Vert.X (фреймворк для создания реактивных приложений на JVM). Перечисленные фреймворки ориентированы на использование кода, создаваемого на основе JVM-совместимого языка программирования, но аналогичные решения есть и для других технологий разработки. Например, для создания микросервисной архитектуры на Go, может использоваться фреймворк Flogo, основанный на идеях потока сообщений/данных между микросервисами и реакции на события (официальный сайт). В этой статье мы рассмотрим его возможности на примере простой задачи обработки данных телеметрии.

Сам фреймворк представляет из себя набор инструментов командной строки, веб-конструктор для создания action и набор библиотек, которые могут быть подключены к коду на Go для взаимодействия с возможностями фреймворка. Кроме того, что фреймворк обеспечивает механизм последовательных операций (некоторого конвейера), основанный на триггерах, а также в нем “из коробки” поддерживается использование предварительно обученных нейронных сетей на основе TensorFlow для принятия решений о выборе последовательности операций. 

Для конфигурирования фреймворка можно установить веб-интерфейс

docker run -itd -p 3303:3303 flogo/flogo-docker eula-accept

или для запуска наиболее актуальной (нестабильной) версии с поддержкой машинного обучения:

docker run -itd -p 3303:3303 flogo/flogo-docker:unstable-ml eula-accept

Также возможно установить набор инструментов командной строки

go get -u github.com/project-flogo/cli/…

Центральным объектом конфигурирования в Flogo является приложение (Application), оно включает в себя зарегистрированные действия (Actions) и связанные с ними триггеры для запуска действия (Triggers). Начнем с создания нового приложения, для этого выполним переход на страницу http://localhost:3303/ и выберем “New” (создание приложения)

Action может работать с потоком данных (Stream) и формировать на основе набора входных данных (с использованием механизма map-filter-reduce) новый поток данных через канал, который может стать источником событий для другого Action. Второй вид Action — поток выполнения (flow), который может рассматриваться как высокоуровневое описание координации микросервисов со связыванием входным и выходных данных к контексту Action.

Action состоит из последовательности активностей (Activity), которые могут быть установлен из git-репозиториев определенной структуры, которая будет описана ниже.

Определим основные сущности Flogo:

  • модели (models) — описывают структуру входных и выходных параметров activity

  • триггеры (triggers) — определяют условия запуска Flow

  • активности (activities) — определяют реализацию шага бизнес-логики

Большое количество готовых Activity можно найти на странице https://tibcosoftware.github.io/flogo/showcases/ и https://github.com/project-flogo/contrib. Аналогично и models и triggers также могут быть установлены из внешнего источника через веб-интерфейс (Install Contribution) или через командную строку flogo install <url>. Среди встроенных Activity можно выделить следующие:

  • Start a subflow — запустить вспомогательный flow при достижении этой activity

  • Run an Action — запустить другой Action (flow/stream)

  • Log — добавить сообщение в журнал

  • Mapper / Filter / Aggregate — реализация обработки данных из входного потока (используется в action типа stream)

  • Throw Error — вернуть ошибку

  • Return — вернуть результат

  • SQL Database Activity — действия по манипуляции данными

  • REST Invoke — запустить внешний REST-сервис

Для запуска Activity может использовать один из встроенных триггеров или подключаемый из внешнего репозитория. Встроенные триггеры могут вызываться из командной строки (CLI Trigger), при получении HTTP-запроса (Receive HTTP Message), данных через TCP/UDP, а также по таймеру. Также можно обмениваться данными с функциями AWS Lambda.

Триггер связан с последовательностью активностей через input/output аргументы (input передаются из триггера в направлении первой активности, output отправляются в триггер и могут, например, выступить в роли кода ответа или содержания сообщения для HTTP-запроса). 

Activity может получать входящие данные с предыдущего шага (будут доступны через селектор $.name), а также создавать выходные данные для следующего шага. Активность Return создает результат выполнения последовательности активностей (в flow создается единственное значение результата, в stream может быть создан поток из результатов).

Для разработки триггеров, активностей и действий могут использоваться базовые структуры из github.com/project-flogo/core. Для описания взаимодействия с контекстом выполнения определяется список входных и выходных аргументов, а также настройки. Для описания полей могут использоваться как примитивные типы данных (string, int, bool), так и составные структуры и массивы (полный список типов приведен здесь). Точкой входа является функция init(), которая должна зарегистрировать структуру с описанием Activity/Action/Trigger. Также репозиторий должен содержать файл descriptor.json с описанием компонента. Метаданные описывают следующие атрибуты:

  • name — название компонента

  • type — тип компонента (flogo:activity, flogo:action, flogo:trigger)

  • version — версия компонента

  • title — название для списка компонентов

  • description — описание компонента

  • display.icon — пиктограмма для списка

  • input — схема входных данных (например, конфигурации или входного потока) для activity

  • output — схема выходных данных

  • handler — настройки для trigger

Мы попробуем разработать Flow для сохранения истории изменения данных с внешнего датчика температуры. Кроме этого мы будем использовать значения температуры для управления внешним исполнителем (кондиционером), который будет реализован в виде подключаемого драйвера на Go, который мы преобразуем в Activity.

Рассмотрим создание следующего сценария обработки потока входных данных с измерениями температуры:

  • данные поступают через HTTP-запросы;

  • производится усреднение окна из 5 последовательных замеров для сглаживания резких выбросов температуры;

  • усредненные замеры отправляются в InfluxDB;

  • выполняется вызов активности для управления кондиционером.

Первая часть обработки реализуется в модели поточной обработки данных. Создаем новое действие типа Stream, назовем его Conditioner Control Flow. Для активации потока добавим триггер для запуска (знак + слева от последовательности активностей) с использованием REST с типом Receive HTTP Message. Перейдем в конфигурацию триггера и определим порт, метод и путь публикации REST-точки подключения.

Для передачи полученных данных добавим в Stream Interface в Inputs именованное поле message и свяжем его в триггере с параметрами http-запроса.

Дальше добавим обработку входных данных и выполним усреднение, для этого добавим активность Aggregate и выполним ее настройку. Будет необходимо указать функцию агрегации (“avg”), режим определения окна (windowType = sliding), размер окна (количество записей для усреднения) windowSize=5. Далее будет необходимо указать способ преобразования входных данных в данные для агрегации. В нашем случае будет необходимо сделать преобразование в число с плавающей точкой coerce.toFloat64($.message). Flogo представляет большой набор функций для работы со строками, датами, генерации случайных чисел и извлечения данных из JSON, при этом все переменные в текущем контексте доступны через служебный объект $. Последним действием Stream должна быть передача вычисленного значения в канал (Channel), для которого необходимо сконфигурировать входной параметр name (например, “smoothed”) и значение (в данном случае оно будет $.result)

Для обработки усредненных значений температуры необходимо создать новый Action с типом Flow, создать входную переменную value для Action, задать стартовый триггер типа Listen to Channel (конфигурация: название — smoothed, Map to flow inputs — связать переменную value c $.data). 

Для отправки данных в InfluxDB необходимо установить дополнительный компонент github.com/shaliniGovindaNayak/flogo-workspace/activity/influxdb. После добавления компонента необходимо выполнить конфигурацию активности (адрес, порт и данные аутентификации к серверу InfluxDB) и указать название переменной с источником данных (» class=»formula inline»>.value). 

Но наибольший интерес представляет создание активности из существующего кода микросервиса. Для этого мы будем использовать возможности базовой библиотеки Flogo. Начнем с создания файла описания активности.

{   "name": "actuator",   "type": "flogo:activity",   "version": "0.0.1",   "title": "Actuator",   "description": "Conditioner Actuator",   "homepage": "https://github.com/dzolotov/actuator",   "settings": [],   ],   "input": [     {       "name": "temperature",       "type": "integer",       "required": true     },     {       "name": "state",       "type": "boolean",       "required": true     }   ],   "output": [     {       "name": "result",       "type": "string",       "required": true     }   ] }

Для подключения ядра и дополнительных функций Flogo подключим дополнительные модули. Сначала создадим описание метаданных (metadata.go).

package actuator  import (     "github.com/project-flogo/core/data/coerce"     "strconv" )  type Settings struct { }  type Input struct { Temperature uint64 `md:"temperature,required"` State bool `md:"state,required"` }  func (r *Input) FromMap(values map[string]interface{}) error { temperature, _ := coerce.ToInt64(values["temperature"])             state, _ := coerce.ToBool(values[“state”]) r.Temperature = temperature             r.State = state return nil }  func (r *Input) ToMap() map[string]interface{} { return map[string]interface{}{ "temperature": strconv.Itoa(r.Temperature),                        "state": strconv.FormatBool(r.State) } }  type Output struct { Result string `md:"result"` }  func (o *Output) FromMap(values map[string]interface{}) error { strVal, _ := coerce.ToString(values["result"]) o.Result = strVal return nil }  func (o *Output) ToMap() map[string]interface{} { return map[string]interface{}{ "result": o.Result, } }

и бизнес-логику (activity.go)

import ( "github.com/project-flogo/core/activity" )  //Точкой входа в activity является функция init() func init() { _ = activity.Register(&ActuatorActivity{}) }  //определение метаданные (структура входных и выходных параметров и настроек) var activityMd = activity.ToMetadata(&Settings{}, &Input{}, &Output{})  type ActuatorActivity struct { }  //получить метаданные func (a *ActuatorActivity) Metadata() *activity.Metadata { return activityMd }  //логика активности func (a *ActuatorActivity) Eval(ctx activity.Context) (done bool, err error) { //получение входных данных input := &Input{} err = ctx.GetInputObject(input) if err != nil { return true, err } temperature := input.Temperature state := input.State //реализация бизнес-логики //… //возврат значения output := &Output{Result: temperature} err = ctx.SetOutputObject(output) if err != nil { return true, err } return true, nil }

Для установки компонента загрузим репозиторий на github и выполним установку дополнительного компонента с адресом репозитория. После этого в списке активностей появится новая активность Actuator (после добавления нужно будет выполнить связывание или определение констант для входных значений и связывание выходных значений).

Итоговая конфигурация может быть выгружена в Web UI и экспортирована в JSON (Export -> App), либо собрана в выполняемый образ для целевой платформы (Linux/Mac/Windows в WebUI, а также любая поддерживаемая целевая платформа при сборке через CLI. 


Сегодня вечером в OTUS пройдет demo-урок «Тестирование в микросервисной архитектуре». На занятии расскажем про различные типы тестов и инструментов, используемых в тестировании, а также поговорим о том, как микросервисная архитектура изменила подходы к тестированию. Регистрация доступна для всех желающих по ссылке.


ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/657017/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *