![](https://habrastorage.org/getpro/habr/upload_files/d15/b28/1fa/d15b281faf477d64c415afb72a76aa3b.png)
![](https://habrastorage.org/getpro/habr/upload_files/97a/792/4b6/97a7924b62b9be7f129abd1951ae7b53.jpg)
Автор статьи: Андрей Поляков
Старший разработчик в Unlimint
Хабр, добрый день! Что первое приходит в голову, когда вы слышите “ETL”? Скорее всего airflow?
По сути airflow — это стандарт де-факто в мире обработки и трансформации данных. Но в случае если в разработка ведется на java, то тогда придется либо переучивать разработчиков на python, либо нанимать еще python разработчиков. В большинстве случаев хочется вносить минимум изменений в процесс разработки и тех. стек. Решение есть! Фреймворк Spring Cloud предоставляет DataFlow — фреймворк для организации ETL в spring среде.
Архитектура
Фреймворк DataFlow состоит из двух основных компонентов:
-
Data Flow Server
-
Skipper Server (шкипер? ?)
Высокоуровневая архитектура показана на схеме ниже:
![](https://habrastorage.org/getpro/habr/upload_files/e0a/348/105/e0a348105e51904735de4a16836f578d.png)
Основная точка входа для взаимодействия с DataFlow — это REST API сервиса Data Flow Server. Web Dashboard и Data Flow Shell также взаимодействуют с Data Flow Server. Система может работать на различных платформах: Cloud Foundry, Kubernetes или на вашем локальном компьютере.
Обратите внимание, что и Data Flow Server, и Skipper Server хранят данные (и свое внутреннее состояние) в своей собственной РСУБД. Можно подключить одну физическую СУБД к обоим сервисам, так как каждый из них создает свой собственный namespace/schema внутри СУБД. По умолчанию каждый из сервисов использует встроенную базу данных H2.
Давайте познакомимся поближе с Data Flow Server. Он отвечает практически за все ?
-
парсинг задач (job-ов) для стриминга и батчинга на основе своего собственного DSL (о нем чуть позже);
-
валидация, сохранение описаний задач стриминга и батчинга;
-
регистрация артефактов (таких как jar файлы и докер образы), которые испльзуются в шагах пайплайнов;
-
деплой задач;
-
планирование работы задач;
-
получение истории выполнения стриминговых и батч-задач;
-
конфигурирование свойств задач (inputs/outputs, начальное количество инстансов, разделение данных и т.д.);
-
делегирование деплоя стриминговых задач сервису Skipper;
-
аудит (таких событий как создание стрима, деплой и уничтожение батч-задач);
-
и многое друге.
Впечатляющий список, не правда ли?
Другой компонент системы — Skipper Server отвечает в основном за выполнение стриминговых задач.
-
Развертывание потоков на одной или нескольких платформах.
-
Обновление и откат потоков на одной или нескольких платформах с использованием стратегии синего/зеленого деплоя.
-
Хранение истории файла манифеста каждого потока (который представляет окончательное описание того, какие приложения были развернуты).
Строим ETL
Обычно ETL пайплайны в SCDF (Spring Cloud DataFlow) состоят из 3-х компонентов:
-
Source — извлечение данных;
-
Processor — преобразование данных;
-
Sink — загрузка.
Давайте рассмотрим пример ETL-пайплайна:
-
Данные получаем по HTTP (source).
-
Преобразуем и насыщаем данные (processor).
-
-
Сохраняем данные в лог (sink).
-
Проведение анализа данных (processor).
-
-
Сохраняем данные в файл (sink).
![](https://habrastorage.org/getpro/habr/upload_files/e17/5a4/bbc/e175a4bbc16cfe01fb537beeefdafc11.png)
Для передачи данных между блоками ETL‑пайплайна используется так называемый messaging middleware — промежуточный слой брокера сообщений. Spring Cloud Data Flow поддерживает два брокера сообщений в качестве messaging middleware — Apache Kafka and RabbitMQ.
Для запуска Spring Cloud Data Flow удобнее всего использовать docker‑compose (хотя это далеко не единственный способ!). Обратимся к официальной документации проекта SCDF и получим ссылки для скачивания файлов docker-compose
:
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose.yml -o docker-compose.yml curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose-<broker>.yml -o docker-compose-<broker>.yml curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose-<database>.yml -o docker-compose-<database>.yml
Здесь:
<broker>
— это брокер сообщений (либо kafka, либо rabbitmq).
<database>
— база данных, которая будет использоваться Data Flow Server и Skipper Server (может быть postgres, mariadb или mysql).
Запускаем скачанные docker-compose
, перед запуском необходимо установить через переменные окружения версии сервисов SCDF, которые будут использоваться. Например:
set DATAFLOW_VERSION=2.10.1& set SKIPPER_VERSION=2.9.1
После того как сервисы в docker‑compose запустились, они будут доступны по следующим портам:
Host ports |
Container ports |
Description |
9393 |
9393 |
На этом порту работает Data Flow server. Можно открыть дашборд по адресу http://localhost:9393/dashboard или взаимодействовать с REST API по адрес http://localhost:9393 |
7577 |
7577 |
На этом порту работает Skipper server. Можно повзаимодействовать с Skipper REST API по адресу http://localhost:7577/api |
Важное примечание:
Сейчас Spring Cloud Data Flow по умолчанию использует Java 8 для запуска приложений. Если вы хотите запускать приложения, собранные с Java 17, нужно установить значение переменной окружения BP_JVM_VERSION
:
export BP_JVM_VERSION=-jdk17
Также вам скорее всего понадобится возможность отладки приложений и пайплайнов. К сожалению, это не такая простая задача, но в целом очень подробно описана в документации: Customizing Docker Compose. Также там же описывается, как добавить zipkin для распределенной трассировки ваших пайплайнов.
Давайте начнем знакомство с веб‑ui системы SCDF.
Здесь мы видим несколько разделов:
-
приложения
-
потоковые пайплайны
-
задачи (tasks)
-
управление и аудит
![](https://habrastorage.org/getpro/habr/upload_files/1f1/1c1/030/1f11c1030d9e1c400b619e1293358b5a.png)
Самая первая вкладка — приложения. Здесь мы можем загружать «приложения» — составные блоки нашего будущего ETL‑пайплайна.
Нажмем на кнопку «добавить приложение». Открывается окно с выбором способа добавления приложения:
![](https://habrastorage.org/getpro/habr/upload_files/2ec/5dd/328/2ec5dd328c2fbddfed506693574d8816.png)
В самом простом случае нас интересует вкладка «зарегистрировать одно или несколько приложений». Указываем название приложения (скорее всего будет совпадать с названием шага в ETL‑пайплайне, которому оно соответствует), выбираем тип приложения и URI. Обратите внимание, что также нужно указать Metadata URI с мета‑данными вашего приложения.
![](https://habrastorage.org/getpro/habr/upload_files/485/5d2/f01/4855d2f010975e071528e198b9227359.png)
Добавили приложение? Ок, двигаемся дальше и приступаем к созданию пайплайна.
Для этого нужно перейти в раздел “Потоки” и нажимаем на кнопку “создать потоки”.
![](https://habrastorage.org/getpro/habr/upload_files/db0/04a/02c/db004a02cac49a4801d434c4e7503a0b.png)
Есть два способа описать ETL пайплайн:
-
в визуальном редакторе на форме;
-
с помощью DSL‑языка.
Неважно, какой из способов вы выберете, второй подход будет автоматически добавлен тоже.
Например:
![](https://habrastorage.org/getpro/habr/upload_files/955/99d/fc6/95599dfc6e657f3657df83815be41217.png)
Поток был создан с помощью визуального редактора и при этом автоматически продублирован в виде DSL:
STREAM-1=http | header-enricher | log :STREAM-1.header-enricher > transform | groovy | file
Нажимаем “создать потоки” и поток отобразится в таблице.
![](https://habrastorage.org/getpro/habr/upload_files/088/e93/aa2/088e93aa23853730d8d6c01305a5016c.png)
Обратите внимание: в таблице отобразились два отдельных потока. При этом конвейер данных у нас один (т.е. оба потока связаны друг с другом и данные из одного передаются в другой):
![](https://habrastorage.org/getpro/habr/upload_files/39e/dff/12e/39edff12eb2398a6812d19ed1542a0e3.png)
Дальше можно запускать сам процесс!
Предположим, что поток данных пошел и наш поток может начать их принимать и обрабатывать.
Переходим к нашему потоку и видим, что он еще не задеплоен.
![](https://habrastorage.org/getpro/habr/upload_files/5bf/fe0/9c2/5bffe09c2779588e68402a44da61e791.png)
![](https://habrastorage.org/getpro/habr/upload_files/73c/969/67d/73c96967da6ff89666906cbfd4baf83a.png)
Первое, что нужно сделать — это задеплоить поток. Нажимаем кнопку «задеплоить» и переходим на форму, на которой необходимо указать параметры развертывания для потока целиком и для каждого из его компонентов:
![](https://habrastorage.org/getpro/habr/upload_files/216/ec5/ac2/216ec5ac2be84741ec984b1cfcd81613.png)
Отлично! Ваш поток развернут и готов к работе.
Как проверить работу пайплайна?
Для созданного примера достаточно, чтобы по указанному http адресу появились данные, которые наш ETL пайплайн сможет получить и обработать. В конце можем проверить успешность пайплайна, если создался файл с преобразованными данными и создалась запись в логе.
Пару слов про свои собственные приложения
В рассмотренном выше примере мы использовали стандартные приложения, которые уже сделаны за нас и их можно скачать. А что делать, если нужно выполнить логику, для которой не подходят уже существующие приложения? Написать свою.
Давайте посмотрим на примере приложения‑processor (у которого есть ендпойнт для получения входных данных и для проброса данных дальше).
Такое приложение можно уместить в одном файле. Например:
@EnableBinding(Processor.class) @SpringBootApplication public class MyProcessorApplication { @Autowired SomeComponent someComponent; public static void main(String[] args) { SpringApplication.run(MyProcessorApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public Object processMessage(Message<?> message) { System.out.println(message); MessageHeaders header = message.getHeaders(); byte[] body = (byte[]) message.getPayload(); // do some logic and transformations someComponent.doSomething(header, body); return message; } }
После того, как код приложения написан, можно зарегистрировать его в Data Flow Server через веб‑ui или через shell. Так как через ui мы уже видели интерфейс, приведу пример, как зарегистрировать через shell:
app register --name myProcessor --type processor --uri maven://com.example:my-processor:jar:0.0.1-SNAPSHOT
После этого при создании потоков можно использовать наше только что зарегистрированное приложение наравне с другими.
Подведем итоги: для java‑мира есть довольно удобная, мощная и гибкая альтернатива для airflow и можно ее использовать для построения ETL‑пайплайнов, конвейеров потоковой обработки данных и процессов обработки тяжелых единоразовых задач.
Ну и напоследок приглашаю всех на бесплатный урок, где мы рассмотрим такой принцип проектирования программного обеспечения, как «Соглашения по конфигурации», Поговорим про «Скриптовый Spring» и затронем не менее важную тему «Безопасного программирования».
ссылка на оригинал статьи https://habr.com/ru/articles/718572/
Добавить комментарий