
Привет, Хабр! На связи Александр Бобряков, техлид в команде МТС Аналитики. В предыдущих постах я рассказал, как собрать первое приложение Flink со Spring, реализовав пайплайн дедупликации сообщений Kafka-to-Kafka. В этом примере погружусь в использование таймеров в Flink, а в следующих статьях расскажу, как работать с более сложными состояниями, эволюционировать их схему и покрыть это все тестами.
Весь разбираемый исходный код есть в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии. Эта статья соответствует релизной ветке с названием release/7_Trigger_Flink_Job.
Это восьмой материал из моей серии про Apache Flink. По мере выхода новых ссылки на них будут появляться ниже.
Список моих статей про Flink:
Оглавление статьи:
Как отправить уведомление по таймеру
Рассмотрим новую бизнес-задачу, которую можно решить с использованием Apache Flink.
Источник данных (Source): Kafka-топик trigger-topic, содержащий сообщения-триггеры (TriggerMessage). В них находится информация о человеке, совершившем действие одного из двух типов: START или STOP.
START подразумевает создание триггера, а STOP — завершение. Например, START отправляется, когда пользователь добавил на сайте товар в корзину, а STOP — при оплате.
TriggerMessage представляет собой JSON:
{ "userId": "3ee2e515-2c82-4d5f-b9b9-8da5f54a7da1", "triggerName": "test_trigger_name", "status": "START", "deviceType": "Phone", "category": "Shop", "count": 5, "timestamp": 123, "data": { "field_1": "value_1" } }
Поле status определяет тип триггера.
Задача отправки уведомлений: отслеживать все триггеры и отправлять уведомление AlertMessage, когда триггер запустился и длится дольше десяти минут. Например, пришло TriggerMessage со статусом START, но в течение десяти минут нет сообщения STOP с аналогичным id человека и названием триггера. Такое уведомление сработает, если посетитель бросил заполненную корзину и покинул сайт.
Приемник данных (Sink): Kafka-топик alert-topic, требующий сообщения AlertMessage, образованного путем прямого сопоставления полей из TriggerMessage:
{ "userId": "3ee2e515-2c82-4d5f-b9b9-8da5f54a7da1", "triggerName": "test_trigger_name", "timestamp": 123 }
Целевой пайплайн
После построения джобы мы получим такой граф обработки данных:

Пайплайн начинается с Source из Kafka-топика trigger-topic. Затем мы фильтруем сообщения по полю status, пропускаем дальше те, где есть значения START или STOP, а также добавляем проверку на название триггера и наличие id клиента.
Далее пишем оператору, который будет следить за определенными событиями по ключу «название_триггера + id_клиента». При обработке START-сообщения оператор создаст таймер на десять минут, а по завершении отправит AlertMessage в дальнейший Sink — Kafka-топик alert-topic. Если STOP все-таки пришел, то AlertMessage не отправляем.
Построение Flink Job
В этом кейсе построение основных абстракций мало отличается от пайплайна из предыдущих статей про Kafka-to-Kafka с дедупликацией. Я быстро пробегусь по основным абстракциям каркаса пайплайна, так как самые интересные моменты скрыты в операторе, который будет следить за таймерами.
Определение DTO для сообщений
Сообщения на входе представляют собой структуру, соответствующую поставленной задаче:
@Data @Builder @Jacksonized @JsonIgnoreProperties(ignoreUnknown = true) public class TriggerMessage { @JsonPropertyDescription("User id") private UUID userId; @JsonPropertyDescription("Trigger name") private String triggerName; @JsonPropertyDescription("Trigger status") @JsonDeserialize(using = TriggerStatus.Deserializer.class) private TriggerStatus status; @JsonPropertyDescription("Device type") private String deviceType; @JsonPropertyDescription("Category") private String category; @JsonPropertyDescription("Trigger count") private int count; @JsonPropertyDescription("Timestamp") private Long timestamp; @JsonPropertyDescription("Trigger additional data") private Map<String, Object> data; }
На выходе ожидаем:
@Data @Builder @Jacksonized @JsonIgnoreProperties(ignoreUnknown = true) public class AlertMessage { @JsonPropertyDescription("User id") private UUID userId; @JsonPropertyDescription("Trigger name") private String triggerName; @JsonPropertyDescription("Timestamp") private Long timestamp; }
Определение настроек Kafka
В настройках Kafka добавятся trigger-topic и alert-topic:
kafka: group-id: group_id bootstrap-servers: localhost:29092 topics: click-topic: 'click-topic' trigger-topic: 'trigger-topic' alert-topic: 'alert-topic'
Определение Kafka Source/Sink
Source и Sink имплементируют созданные ранее абстракции SourceBinder и SinkProvider соответственно:
@Component @RequiredArgsConstructor class TriggerMessageKafkaSourceBinder implements SourceBinder<TriggerMessage> { private final KafkaProperties kafkaProperties; private final DeserializationSchema<TriggerMessage> deserializationClickMessageSchema; @Override public DataStream<TriggerMessage> bindSource(StreamExecutionEnvironment environment) { final var sourceName = "Trigger message Kafka source"; return environment.fromSource( KafkaSource.<TriggerMessage>builder() .setBootstrapServers(kafkaProperties.getBootstrapServers()) .setStartingOffsets(OffsetsInitializer.committedOffsets(EARLIEST)) .setGroupId(kafkaProperties.getGroupId()) .setTopics(kafkaProperties.getTopics().getTriggerTopic()) .setValueOnlyDeserializer(deserializationClickMessageSchema) .build(), WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)), sourceName ).name(sourceName).uid(sourceName + "_id"); } }
Разницы с представленным в предыдущих статьях Source для Kafka нет, за исключением используемого топика: создаем Source с помощью вспомогательного класса KafkaSourceBuilder, задаем в нем необходимые параметры, а также кастомный десериализатор сообщений TriggerMessage. Это решение основано на Jackson и реализовывает обязательные методы базового интерфейса Flink DeserializationSchema:
@Component @RequiredArgsConstructor class TriggerMessageDeserializationSchema implements DeserializationSchema<TriggerMessage> { private static final long serialVersionUID = 1L; private transient ObjectMapper objectMapper; @Override public void open(InitializationContext context) { objectMapper = createObjectMapper(); } @Override public TriggerMessage deserialize(byte[] message) throws IOException { return objectMapper.readValue(message, TriggerMessage.class); } @Override public boolean isEndOfStream(TriggerMessage nextElement) { return false; } @Override public TypeInformation<TriggerMessage> getProducedType() { return TypeInformation.of(TriggerMessage.class); } }
Аналогично задаем подключение к KafkaSink для alert-topic:
@Component @RequiredArgsConstructor public class AlertMessageKafkaSinkProvider implements SinkProvider<AlertMessage> { private final KafkaProperties kafkaProperties; private final SerializationSchema<AlertMessage> serializationProductMessageSchema; @Override public Sink<AlertMessage> createSink() { return KafkaSink.<AlertMessage>builder() .setBootstrapServers(kafkaProperties.getBootstrapServers()) .setRecordSerializer(KafkaRecordSerializationSchema.<AlertMessage>builder() .setTopic(kafkaProperties.getTopics().getAlertTopic()) .setValueSerializationSchema(serializationProductMessageSchema) .build()) .setDeliveryGuarantee(NONE) .build(); } }
Но используем новый сериализатор схемы AlertMessage:
@Component @RequiredArgsConstructor class AlertMessageSerializationSchema implements SerializationSchema<AlertMessage> { private static final long serialVersionUID = 1; private transient ObjectMapper objectMapper; @Override public void open(InitializationContext context) { objectMapper = createObjectMapper(); } @Override @SneakyThrows public byte[] serialize(AlertMessage element) { return objectMapper.writeValueAsBytes(element); } }
За более подробным разъяснением описанных выше абстракций можно обратиться к моей статье «Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?».
Фильтр событий
Первым бизнес-оператором после Kafka Source является фильтр событий. Это стандартный stateless-оператор, рассматривающий каждое входящее событие независимо от других. Для его реализации достаточно имплементировать интерфейс FilterFunction:
public class TriggerMessageByStatusAndUserFilter implements FilterFunction<TriggerMessage> { private static final long serialVersionUID = 1L; @Override public boolean filter(TriggerMessage message) { return Optional.ofNullable(message.getStatus()) .map(status -> (START.equals(status) || STOP.equals(status)) && message.getUserId() != null && isNotBlank(message.getTriggerName())) .orElse(false); } }
В нашем случае мы должны пропускать только TriggerMessage, у которых есть статус START или STOP, id клиента и конкретное название триггера.
Оператор создания уведомления по таймеру
Основным компонентом пайплайна обработки TriggerMessage является оператор, который будет генерировать AlertMessage по условиям из технического задания. Алгоритм его работы:
-
получаем TriggerMessage;
-
в разрезе ключа (trigger_name + user_id) выполняем действия:
-
если статус TriggerMessage равен START, то создаем таймер на 10 минут, по истечении которого генерируем AlertMessage на основе пришедшего START события;
-
если статус TriggerMessage равен STOP, то удаляем созданный в пункте выше таймер. Если подобного сообщения еще не было (в разрезе ключа), то ничего страшного.
-
Для реализации такого оператора нам понадобятся знания в двух темах: Flink-таймерах и состоянии Keyed State. Я их разобрал в прошлой части цикла.
Таймеры позволяют запустить логику через определенное время. При этом отсчет может идти как от текущего времени самой машины, выполняющей задание Flink (processing time), так и от времени прошедшего события (event time). Важно помнить, что таймеры доступны только для потока Keyed Stream.
Доступ к таймерам можно получить из абстракции оператора Flink ProcessFunction. В нашем случае мы имеем дело с событиями в разрезе ключа (trigger_name + user_id), поэтому нам заранее нужно разделить поток по этому ключу с помощью метода .keyBy(). Так все события с одинаковым ключом попадут на один и тот же параллельный инстанс следующего оператора создания уведомлений, ведь в нем будем использовать KeyedState.
Указать ключ разделения можно либо с помощью лямбды при построении самого пайплайна, либо использовать имплементацию интерфейса KeySelector. Воспользуемся вторым вариантом. Поместим эту реализацию внутрь нашего нового оператора в качестве внутреннего статического класса, так как он неразрывно связан с оператором отправки уведомлений:
public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> { private static final long serialVersionUID = 1L; @Override public String getKey(TriggerMessage message) { return message.getUserId() + message.getTriggerName(); } }
Теперь поговорим о самом операторе отправки уведомлений. Как я писал выше, нам нужна реализация класса KeyedProcessFunction. Она предоставляет доступ к контексту Flink, через который можно получить доступ к состояниям и таймерам.
В общем случае имплементация KeyedProcessFunction выглядит так:
public class TriggerAlertProcessor<K, I, O> extends KeyedProcessFunction<K, I, O> { @Override public void open(Configuration parameters) { // ... } @Override public void processElement(O value, KeyedProcessFunction<I, O, K>.Context ctx, Collector<K> out) throws Exception { // ... } @Override public void onTimer(long timestamp, KeyedProcessFunction<I, O, K>.OnTimerContext ctx, Collector<K> out) throws Exception { // ... } }
При обработке событий в методе processElement объект Context предоставляет доступ к элементу TimerService. Он умеет работать с метками времени: временем событий (для срабатывания триггера будет использован водяной знак Watermark) и реальным временем машины, где работает оператор. Через TimerService можно создать таймер, который запустится в определенное время в будущем.
KeyedProcessFunction также предоставляет метод onTimer() — именно он будет вызываться при срабатывании таймера. Во время этого вызова все состояния снова привязаны к ключу, с которым был создан таймер. Это позволяет таймерам управлять состоянием с ключом.
Важные замечания по поводу таймеров описаны в документации (перевод):
«Оба типа таймеров (время обработки и время события) обслуживаются внутри TimerService и помещаются в очередь для выполнения. TimerService дедуплицирует таймеры для каждого ключа и временной метки, т. е. существует не более одного таймера для каждого ключа и временной метки. Если для одной и той же временной метки зарегистрировано несколько таймеров, onTimer() метод будет вызываться только один раз. Flink синхронизирует вызовы onTimer() и processElement(). Следовательно, пользователям не нужно беспокоиться об одновременном изменении состояния».
В итоге более детальный план выполнения нашего оператора выглядит следующим образом:
-
Получаем TriggerMessage.
-
Если status события равен START, то с помощью TimerService создаем таймер длительностью десять минут, а также сохраняем состояние ValueState для текущего сообщения. На его основе мы в дальнейшем сможем создать AlertMessage.
-
Если status равен STOP, то удаляем все состояния для текущего ключа и созданный ранее таймер. Flink API позволяет удалить его, передав время срабатывания, поэтому нам дополнительно понадобится состояние для хранения этого времени.
-
onTimer() реализация должна формировать AlertMesage по сохраненному состоянию в пункте 2 и чистить все состояния по текущему ключу.
На основе этого создадим класс AlertState. Он будет отвечать за объекты, сохраненные в состоянии оператора создания уведомлений. Схематично этот класс будет использоваться в следующем преобразовании:

Такой промежуточный класс необходим для:
-
Разделения ответственности между форматом сообщений в Async API (Kafka) и нашим внутренним состоянием. Мы не отдаем наружу внутренний формат хранения данных нашего приложения и можем независимо его менять.
-
Уменьшения данных во внутреннем состоянии. Если бы мы хранили там целиком входящее событие, это бы очень сильно отразилось на быстродействии процессов десериализации и сериализации.
В итоге AlertState будет полностью повторять формат выходного сообщения AlertMessage для нашего простейшего примера джобы:
@Data @Builder @Jacksonized @JsonIgnoreProperties(ignoreUnknown = true) public class AlertState { @JsonPropertyDescription("User id") private UUID userId; @JsonPropertyDescription("Trigger name") private String triggerName; @JsonPropertyDescription("Timestamp") private Long timestamp; }
Теперь посмотрим на реализацию самого оператора уведомлений:
@Slf4j public class TriggerAlertProcessor extends KeyedProcessFunction<String, TriggerMessage, AlertMessage> { private static final long serialVersionUID = 1L; private final Duration stateWaiting; private ValueState<AlertState> alertState; private ValueState<Long> timestampState; private transient MessageTransformer<TriggerMessage, AlertState> messageToStateTransformer; private transient MessageTransformer<AlertState, AlertMessage> stateToMessageTransformer; public TriggerAlertProcessor(@NotNull Duration stateWaiting) { this.stateWaiting = stateWaiting; } @Override public void open(Configuration parameters) { // ... } @Override public void processElement(TriggerMessage message, KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx, Collector<AlertMessage> out) throws Exception { // ... } @Override public void onTimer(long timestamp, KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx, Collector<AlertMessage> out) throws Exception { // ... } public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> { // ... } }
Мы видим, что оператор наследует класс KeyedProcessFunction<K, I, O>. В нем:
-
K — формат ключа (String);
-
I — формат входных сообщений (TriggerMessage);
-
O — формат выходных событий (AlertMessage).
Далее нам нужны следующие поля класса:
-
stateWainting — длительность ожидания события STOP по нашему ТЗ. По умолчанию она составляет десять минут, и мы передадим это значение из настроек application.yml непосредственно в конструкторе оператора;
-
alertState — состояние KeyedState, которое хранит AlertState для каждого пришедшего события TriggerMessage со статусом START. В дальнейшем мы будем преобразовать его в AlertMessage;
-
timestampState — состояние KeyedState, хранящее время срабатывания таймера. Это необходимо для возможности его удаления по событию STOP, описанному выше;
-
messageToStateTransformer и stateToMessageTransformer — классы, преобразующие TriggerMessage в состояние AlertState и AlertState в AlertMessage. Их код легко понять: мы напрямую перекладываем соответствующие поля.
Теперь посмотрим на метод open():
@Override public void open(Configuration parameters) { messageToStateTransformer = new TriggerMessageToAlertStateTransformer(); stateToMessageTransformer = new AlertStateToAlertMessageTransformer(); final var defaultTtlConfig = StateTtlConfig .newBuilder(Time.minutes(stateWaiting.toMillis() + 1000)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(NeverReturnExpired) .cleanupFullSnapshot() .build(); // alert state final var alertDescriptor = new ValueStateDescriptor<>("alertState", AlertState.class); alertDescriptor.enableTimeToLive(defaultTtlConfig); alertState = getRuntimeContext().getState(alertDescriptor); // timestamp state final var timestampDescriptor = new ValueStateDescriptor<>("timestampState", Types.LONG); timestampDescriptor.enableTimeToLive(defaultTtlConfig); timestampState = getRuntimeContext().getState(timestampDescriptor); }
Как я упоминал в предыдущих статьях, в основном он необходим для инициализации объекта после его десериализации в слоте TaskManager перед непосредственным началом обработки сообщений (инициализацией transient-полей, подготовкой обработки событий и так далее).
В нашей реализации мы инициализируем:
-
Трансформеры сообщений: TriggerMessage → AlertState и AlertState → AlertMessage.
-
Конфигурацию TTL, где время ttl немного больше, чем время ожидания в 10 минут.
-
Состояние для AlertState и Timestamp: для этого передаем названия и типы хранящихся данных. Обратите внимание на важный момент: для инициализации состояний мы используем указание классов напрямую, хотя есть возможность указать сериализатор. Об этом я подробнее расскажу в следующих статьях.
Теперь оператор настроен и готов к обработке событий. Она производится в методе processElement, куда поступают TriggerMessage, ссылка до контекста Flink, а также ссылка out для добавления в выходной поток событий из оператора.
@Override public void processElement(TriggerMessage message, KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx, Collector<AlertMessage> out) throws Exception { final var status = message.getStatus(); if (START.equals(status)) { // create timer final var state = messageToStateTransformer.transform(message); alertState.update(state); final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis(); final var previousTimestamp = timestampState.value(); if (previousTimestamp != null) { ctx.timerService().deleteProcessingTimeTimer(previousTimestamp); } ctx.timerService().registerProcessingTimeTimer(invokeTimerMillis); timestampState.update(invokeTimerMillis); } else if (STOP.equals(status)) { // remove timer final var invokeTimerMillis = timestampState.value(); if (invokeTimerMillis != null) { ctx.timerService().deleteProcessingTimeTimer(invokeTimerMillis); timestampState.clear(); } alertState.clear(); } else { log.debug("Unknown trigger status {} for key {}", status, ctx.getCurrentKey()); } }
Взглянем на внутренности метода более подробно:
-
Определили статус события — START или STOP. Остальные типы к нам не придут: вспоминаем первый фильтр в нашем пайплайне. Текущий оператор знать об этом не должен, поэтому добавляем логирование.
-
В случае START мы трансформируем пришедший триггер в объект состояния AlertState и сохраняем его в это состояние, а далее создаем сам таймер:
final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();
Этой строкой берем текущее время машины, где выполняется задание Flink и прибавляем ему десять минут по бизнес-условию. В итоге получаем время срабатывания таймера. Если таймер для текущего ключа (trigger_name + user_id) уже есть, то удаляем его и просто создаем новый по вычисленному времени. Также запоминаем это время отдельно в состоянии timestampState.
-
В случае STOP проверяем, существует ли в состоянии хоть какое-то время. Если оно есть, то мы создавали таймер в START блоке. Удаляем его и чистим все состояния для текущего ключа события.
Цель метода processElement — создавать таймеры и чистить состояния по бизнес-сценариям. Заметьте, что мы нигде в этом методе не добавляли события в выходной поток с помощью ссылки out. В выходной поток мы добавим события только при срабатывании таймера. За это отвечает метод onTimer():
@Override public void onTimer(long timestamp, KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx, Collector<AlertMessage> out) throws Exception { final var alertStateValue = alertState.value(); if (alertStateValue != null) { final var alertMessage = stateToMessageTransformer.transform(alertStateValue); out.collect(alertMessage); } timestampState.clear(); alertState.clear(); }
На входе он получает timestamp срабатывания таймера, все ту же ссылку на Flink, контекст и ссылку out. Взглянем на его реализацию подробнее:
-
Сначала мы достаем состояние AlertState, которое было добавлено на этапе обработке события в processElement.
-
Затем трансформируем его в формат выходного события AlertMessage.
-
В конце очищаем все состояния, потому что они больше не нужны.
Собираем Flink Job
Чтобы гибко настраивать параметры джобы, вынесем длительность таймеров в настройки application.yml (с маппингом на класс TriggerToAlertJobProperties):
jobs: trigger-to-alert-job: enabled: true state-waiting: 10m
Собрав все операторы вместе, получим такую имплементацию нашего класса FlinkJob:
@Component @AllArgsConstructor @ConditionalOnProperty("jobs.trigger-to-alert-job.enabled") public class TriggerToAlertJob extends FlinkJob { private final TriggerToAlertJobProperties properties; private final SourceBinder<TriggerMessage> sourceBinder; private final SinkProvider<AlertMessage> sinkProvider; @Override public void registerJob(StreamExecutionEnvironment env) { sourceBinder.bindSource(env) .filter(new TriggerMessageByStatusAndUserFilter()) .uid("filter_trigger_message_by_status_id").name("filter_trigger_message_by_status") .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector()) .process(new TriggerAlertProcessor(properties.getStateWaiting())) .uid("trigger_alert_processor_id").name("trigger_alert_processor") .sinkTo(sinkProvider.createSink()).uid("sink_alert_message_id").name("sink_alert_message"); } }
В качестве полей класса стандартно используем настройки для этой джобы, поля для получения Kafka Source и генерации Kafka Sink. В самой джобе по нашему начальному пайплайну определяем фильтр, оператор с таймерами и sink. Конечно, не забываем указать удобочитаемые имена и уникальный id для корректного восстановления состояния из savepoint.
В следующих статьях мы проверим работоспособность всего этого тестами. Используем E2E-тесты для основного сценария, unit-тесты для TriggerToAlertJob без Kafka и для отдельных операторов вместе с бизнес-сценариями генерации таймеров. Начнем со стандартных тестов, а далее перейдем к достаточно сложным и неочевидным проблемам, которые могут возникнуть в подобных Flink-джобах.
ссылка на оригинал статьи https://habr.com/ru/articles/839520/
Добавить комментарий