Отправка уведомлений по таймеру в Apache Flink

от автора

Привет, Хабр! На связи Александр Бобряков, техлид в команде МТС Аналитики. В предыдущих постах я рассказал, как собрать первое приложение Flink со Spring, реализовав пайплайн дедупликации сообщений Kafka-to-Kafka. В этом примере погружусь в использование таймеров в Flink, а в следующих статьях расскажу, как работать с более сложными состояниями, эволюционировать их схему и покрыть это все тестами.

Весь разбираемый исходный код есть в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии. Эта статья соответствует релизной ветке с названием release/7_Trigger_Flink_Job.

Это восьмой материал из моей серии про Apache Flink. По мере выхода новых ссылки на них будут появляться ниже.

Список моих статей про Flink:
Оглавление статьи:

Как отправить уведомление по таймеру

Рассмотрим новую бизнес-задачу, которую можно решить с использованием Apache Flink.

Источник данных (Source): Kafka-топик trigger-topic, содержащий сообщения-триггеры (TriggerMessage). В них находится информация о человеке, совершившем действие одного из двух типов: START или STOP.

START подразумевает создание триггера, а STOP — завершение. Например, START отправляется, когда пользователь добавил на сайте товар в корзину, а STOP — при оплате.

TriggerMessage представляет собой JSON:

{   "userId": "3ee2e515-2c82-4d5f-b9b9-8da5f54a7da1",   "triggerName": "test_trigger_name",   "status": "START",   "deviceType": "Phone",   "category": "Shop",   "count": 5,   "timestamp": 123,   "data": {     "field_1": "value_1"   } }

Поле status определяет тип триггера.

Задача отправки уведомлений: отслеживать все триггеры и отправлять уведомление AlertMessage, когда триггер запустился и длится дольше десяти минут. Например, пришло TriggerMessage со статусом START, но в течение десяти минут нет сообщения STOP с аналогичным id человека и названием триггера. Такое уведомление сработает, если посетитель бросил заполненную корзину и покинул сайт.

Приемник данных (Sink): Kafka-топик alert-topic, требующий сообщения AlertMessage, образованного путем прямого сопоставления полей из TriggerMessage:

{   "userId": "3ee2e515-2c82-4d5f-b9b9-8da5f54a7da1",   "triggerName": "test_trigger_name",   "timestamp": 123 }

Целевой пайплайн

После построения джобы мы получим такой граф обработки данных:

Пайплайн начинается с Source из Kafka-топика trigger-topic. Затем мы фильтруем сообщения по полю status, пропускаем дальше те, где есть значения START или STOP, а также добавляем проверку на название триггера и наличие id клиента.

Далее пишем оператору, который будет следить за определенными событиями по ключу «название_триггера + id_клиента». При обработке START-сообщения оператор создаст таймер на десять минут, а по завершении отправит AlertMessage в дальнейший Sink — Kafka-топик alert-topic. Если STOP все-таки пришел, то AlertMessage не отправляем.

Построение Flink Job

В этом кейсе построение основных абстракций мало отличается от пайплайна из предыдущих статей про Kafka-to-Kafka с дедупликацией. Я быстро пробегусь по основным абстракциям каркаса пайплайна, так как самые интересные моменты скрыты в операторе, который будет следить за таймерами.

Определение DTO для сообщений

Сообщения на входе представляют собой структуру, соответствующую поставленной задаче:

@Data @Builder @Jacksonized @JsonIgnoreProperties(ignoreUnknown = true) public class TriggerMessage {    @JsonPropertyDescription("User id")    private UUID userId;     @JsonPropertyDescription("Trigger name")    private String triggerName;     @JsonPropertyDescription("Trigger status")    @JsonDeserialize(using = TriggerStatus.Deserializer.class)    private TriggerStatus status;     @JsonPropertyDescription("Device type")    private String deviceType;     @JsonPropertyDescription("Category")    private String category;     @JsonPropertyDescription("Trigger count")    private int count;     @JsonPropertyDescription("Timestamp")    private Long timestamp;     @JsonPropertyDescription("Trigger additional data")    private Map<String, Object> data; }

На выходе ожидаем:

@Data @Builder @Jacksonized @JsonIgnoreProperties(ignoreUnknown = true) public class AlertMessage {    @JsonPropertyDescription("User id")    private UUID userId;     @JsonPropertyDescription("Trigger name")    private String triggerName;     @JsonPropertyDescription("Timestamp")    private Long timestamp; }

Определение настроек Kafka

В настройках Kafka добавятся trigger-topic и alert-topic:

kafka:  group-id: group_id  bootstrap-servers: localhost:29092  topics:    click-topic: 'click-topic'    trigger-topic: 'trigger-topic'    alert-topic: 'alert-topic'

Определение Kafka Source/Sink

Source и Sink имплементируют созданные ранее абстракции SourceBinder и SinkProvider соответственно:

@Component @RequiredArgsConstructor class TriggerMessageKafkaSourceBinder implements SourceBinder<TriggerMessage> {    private final KafkaProperties kafkaProperties;    private final DeserializationSchema<TriggerMessage> deserializationClickMessageSchema;     @Override    public DataStream<TriggerMessage> bindSource(StreamExecutionEnvironment environment) {        final var sourceName = "Trigger message Kafka source";        return environment.fromSource(                KafkaSource.<TriggerMessage>builder()                        .setBootstrapServers(kafkaProperties.getBootstrapServers())                        .setStartingOffsets(OffsetsInitializer.committedOffsets(EARLIEST))                        .setGroupId(kafkaProperties.getGroupId())                        .setTopics(kafkaProperties.getTopics().getTriggerTopic())                        .setValueOnlyDeserializer(deserializationClickMessageSchema)                        .build(),                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),                sourceName        ).name(sourceName).uid(sourceName + "_id");    } }

Разницы с представленным в предыдущих статьях Source для Kafka нет, за исключением используемого топика: создаем Source с помощью вспомогательного класса KafkaSourceBuilder, задаем в нем необходимые параметры, а также кастомный десериализатор сообщений TriggerMessage. Это решение основано на Jackson и реализовывает обязательные методы базового интерфейса Flink DeserializationSchema:

@Component @RequiredArgsConstructor class TriggerMessageDeserializationSchema implements DeserializationSchema<TriggerMessage> {    private static final long serialVersionUID = 1L;     private transient ObjectMapper objectMapper;     @Override    public void open(InitializationContext context) {        objectMapper = createObjectMapper();    }     @Override    public TriggerMessage deserialize(byte[] message) throws IOException {        return objectMapper.readValue(message, TriggerMessage.class);    }     @Override    public boolean isEndOfStream(TriggerMessage nextElement) {        return false;    }     @Override    public TypeInformation<TriggerMessage> getProducedType() {        return TypeInformation.of(TriggerMessage.class);    } }

Аналогично задаем подключение к KafkaSink для alert-topic:

@Component @RequiredArgsConstructor public class AlertMessageKafkaSinkProvider implements SinkProvider<AlertMessage> {    private final KafkaProperties kafkaProperties;    private final SerializationSchema<AlertMessage> serializationProductMessageSchema;     @Override    public Sink<AlertMessage> createSink() {        return KafkaSink.<AlertMessage>builder()                .setBootstrapServers(kafkaProperties.getBootstrapServers())                .setRecordSerializer(KafkaRecordSerializationSchema.<AlertMessage>builder()                        .setTopic(kafkaProperties.getTopics().getAlertTopic())                        .setValueSerializationSchema(serializationProductMessageSchema)                        .build())                .setDeliveryGuarantee(NONE)                .build();    } }

Но используем новый сериализатор схемы AlertMessage:

@Component @RequiredArgsConstructor class AlertMessageSerializationSchema implements SerializationSchema<AlertMessage> {    private static final long serialVersionUID = 1;     private transient ObjectMapper objectMapper;     @Override    public void open(InitializationContext context) {        objectMapper = createObjectMapper();    }     @Override    @SneakyThrows    public byte[] serialize(AlertMessage element) {        return objectMapper.writeValueAsBytes(element);    } }

За более подробным разъяснением описанных выше абстракций можно обратиться к моей статье «Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?».

Фильтр событий

Первым бизнес-оператором после Kafka Source является фильтр событий. Это стандартный stateless-оператор, рассматривающий каждое входящее событие независимо от других. Для его реализации достаточно имплементировать интерфейс FilterFunction:

public class TriggerMessageByStatusAndUserFilter implements FilterFunction<TriggerMessage> {    private static final long serialVersionUID = 1L;     @Override    public boolean filter(TriggerMessage message) {        return Optional.ofNullable(message.getStatus())                .map(status -> (START.equals(status) || STOP.equals(status))                        && message.getUserId() != null                        && isNotBlank(message.getTriggerName()))                .orElse(false);    } }

В нашем случае мы должны пропускать только TriggerMessage, у которых есть статус START или STOP, id клиента и конкретное название триггера.

Оператор создания уведомления по таймеру

Основным компонентом пайплайна обработки TriggerMessage является оператор, который будет генерировать AlertMessage по условиям из технического задания. Алгоритм его работы:

  1. получаем TriggerMessage;

  2. в разрезе ключа (trigger_name + user_id) выполняем действия:

    • если статус TriggerMessage равен START, то создаем таймер на 10 минут, по истечении которого генерируем AlertMessage на основе пришедшего START события;

    • если статус TriggerMessage равен STOP, то удаляем созданный в пункте выше таймер. Если подобного сообщения еще не было (в разрезе ключа), то ничего страшного.

Для реализации такого оператора нам понадобятся знания в двух темах: Flink-таймерах и состоянии Keyed State. Я их разобрал в прошлой части цикла.

Таймеры позволяют запустить логику через определенное время. При этом отсчет может идти как от текущего времени самой машины, выполняющей задание Flink (processing time), так и от времени прошедшего события (event time). Важно помнить, что таймеры доступны только для потока Keyed Stream.

Доступ к таймерам можно получить из абстракции оператора Flink ProcessFunction. В нашем случае мы имеем дело с событиями в разрезе ключа (trigger_name + user_id), поэтому нам заранее нужно разделить поток по этому ключу с помощью метода .keyBy(). Так все события с одинаковым ключом попадут на один и тот же параллельный инстанс следующего оператора создания уведомлений, ведь в нем будем использовать KeyedState.

Указать ключ разделения можно либо с помощью лямбды при построении самого пайплайна, либо использовать имплементацию интерфейса KeySelector. Воспользуемся вторым вариантом. Поместим эту реализацию внутрь нашего нового оператора в качестве внутреннего статического класса, так как он неразрывно связан с оператором отправки уведомлений:

public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> {     private static final long serialVersionUID = 1L;      @Override     public String getKey(TriggerMessage message) {         return message.getUserId() + message.getTriggerName();     } }

Теперь поговорим о самом операторе отправки уведомлений. Как я писал выше, нам нужна реализация класса KeyedProcessFunction. Она предоставляет доступ к контексту Flink, через который можно получить доступ к состояниям и таймерам.

В общем случае имплементация KeyedProcessFunction выглядит так:

public class TriggerAlertProcessor<K, I, O> extends KeyedProcessFunction<K, I, O> {     @Override    public void open(Configuration parameters) {       // ...    }     @Override    public void processElement(O value, KeyedProcessFunction<I, O, K>.Context ctx, Collector<K> out) throws Exception {       // ...    }     @Override    public void onTimer(long timestamp, KeyedProcessFunction<I, O, K>.OnTimerContext ctx, Collector<K> out) throws Exception {       // ...    } }

При обработке событий в методе processElement объект Context предоставляет доступ к элементу TimerService. Он умеет работать с метками времени: временем событий (для срабатывания триггера будет использован водяной знак Watermark) и реальным временем машины, где работает оператор. Через TimerService можно создать таймер, который запустится в определенное время в будущем.

KeyedProcessFunction также предоставляет метод onTimer() — именно он будет вызываться при срабатывании таймера. Во время этого вызова все состояния снова привязаны к ключу, с которым был создан таймер. Это позволяет таймерам управлять состоянием с ключом.

Важные замечания по поводу таймеров описаны в документации (перевод):

«Оба типа таймеров (время обработки и время события) обслуживаются внутри TimerService и помещаются в очередь для выполнения. TimerService дедуплицирует таймеры для каждого ключа и временной метки, т. е. существует не более одного таймера для каждого ключа и временной метки. Если для одной и той же временной метки зарегистрировано несколько таймеров, onTimer() метод будет вызываться только один раз. Flink синхронизирует вызовы onTimer() и processElement(). Следовательно, пользователям не нужно беспокоиться об одновременном изменении состояния».

В итоге более детальный план выполнения нашего оператора выглядит следующим образом:

  1. Получаем TriggerMessage.

  2. Если status события равен START, то с помощью TimerService создаем таймер длительностью десять минут, а также сохраняем состояние ValueState для текущего сообщения. На его основе мы в дальнейшем сможем создать AlertMessage.

  3. Если status равен STOP, то удаляем все состояния для текущего ключа и созданный ранее таймер. Flink API позволяет удалить его, передав время срабатывания, поэтому нам дополнительно понадобится состояние для хранения этого времени.

  4. onTimer() реализация должна формировать AlertMesage по сохраненному состоянию в пункте 2 и чистить все состояния по текущему ключу.

На основе этого создадим класс AlertState. Он будет отвечать за объекты, сохраненные в состоянии оператора создания уведомлений. Схематично этот класс будет использоваться в следующем преобразовании:

Такой промежуточный класс необходим для:

  1. Разделения ответственности между форматом сообщений в Async API (Kafka) и нашим внутренним состоянием. Мы не отдаем наружу внутренний формат хранения данных нашего приложения и можем независимо его менять.

  2. Уменьшения данных во внутреннем состоянии. Если бы мы хранили там целиком входящее событие, это бы очень сильно отразилось на быстродействии процессов десериализации и сериализации.

В итоге AlertState будет полностью повторять формат выходного сообщения AlertMessage для нашего простейшего примера джобы:

@Data @Builder @Jacksonized @JsonIgnoreProperties(ignoreUnknown = true) public class AlertState {    @JsonPropertyDescription("User id")    private UUID userId;     @JsonPropertyDescription("Trigger name")    private String triggerName;     @JsonPropertyDescription("Timestamp")    private Long timestamp; }

Теперь посмотрим на реализацию самого оператора уведомлений:

@Slf4j public class TriggerAlertProcessor extends KeyedProcessFunction<String, TriggerMessage, AlertMessage> {    private static final long serialVersionUID = 1L;     private final Duration stateWaiting;    private ValueState<AlertState> alertState;    private ValueState<Long> timestampState;    private transient MessageTransformer<TriggerMessage, AlertState> messageToStateTransformer;    private transient MessageTransformer<AlertState, AlertMessage> stateToMessageTransformer;     public TriggerAlertProcessor(@NotNull Duration stateWaiting) {        this.stateWaiting = stateWaiting;    }     @Override    public void open(Configuration parameters) {       // ...    }     @Override    public void processElement(TriggerMessage message,                               KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx,                               Collector<AlertMessage> out) throws Exception {       // ...    }     @Override    public void onTimer(long timestamp,                        KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx,                        Collector<AlertMessage> out) throws Exception {       // ...    }     public static class TriggerAlertProcessorKeySelector implements KeySelector<TriggerMessage, String> {       // ...    } }

Мы видим, что оператор наследует класс KeyedProcessFunction<K, I, O>. В нем:

  • K — формат ключа (String);

  • I — формат входных сообщений (TriggerMessage);

  • O — формат выходных событий (AlertMessage).

Далее нам нужны следующие поля класса:

  • stateWainting — длительность ожидания события STOP по нашему ТЗ. По умолчанию она составляет десять минут, и мы передадим это значение из настроек application.yml непосредственно в конструкторе оператора;

  • alertState — состояние KeyedState, которое хранит AlertState для каждого пришедшего события TriggerMessage со статусом START. В дальнейшем мы будем преобразовать его в AlertMessage;

  • timestampState — состояние KeyedState, хранящее время срабатывания таймера. Это необходимо для возможности его удаления по событию STOP, описанному выше;

  • messageToStateTransformer и stateToMessageTransformer — классы, преобразующие TriggerMessage в состояние AlertState и AlertState в AlertMessage. Их код легко понять: мы напрямую перекладываем соответствующие поля.

Теперь посмотрим на метод open():

@Override    public void open(Configuration parameters) {        messageToStateTransformer = new TriggerMessageToAlertStateTransformer();        stateToMessageTransformer = new AlertStateToAlertMessageTransformer();        final var defaultTtlConfig = StateTtlConfig                .newBuilder(Time.minutes(stateWaiting.toMillis() + 1000))                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)                .setStateVisibility(NeverReturnExpired)                .cleanupFullSnapshot()                .build();         // alert state        final var alertDescriptor = new ValueStateDescriptor<>("alertState", AlertState.class);        alertDescriptor.enableTimeToLive(defaultTtlConfig);        alertState = getRuntimeContext().getState(alertDescriptor);         // timestamp state        final var timestampDescriptor = new ValueStateDescriptor<>("timestampState", Types.LONG);        timestampDescriptor.enableTimeToLive(defaultTtlConfig);        timestampState = getRuntimeContext().getState(timestampDescriptor);    }

Как я упоминал в предыдущих статьях, в основном он необходим для инициализации объекта после его десериализации в слоте TaskManager перед непосредственным началом обработки сообщений (инициализацией transient-полей, подготовкой обработки событий и так далее).

В нашей реализации мы инициализируем:

  1. Трансформеры сообщений: TriggerMessage → AlertState и AlertState →  AlertMessage.

  2. Конфигурацию TTL, где время ttl немного больше, чем время ожидания в 10 минут.

  3. Состояние для AlertState и Timestamp: для этого передаем названия и типы хранящихся данных. Обратите внимание на важный момент: для инициализации состояний мы используем указание классов напрямую, хотя есть возможность указать сериализатор. Об этом я подробнее расскажу в следующих статьях.

Теперь оператор настроен и готов к обработке событий. Она производится в методе processElement, куда поступают TriggerMessage, ссылка до контекста Flink, а также ссылка out для добавления в выходной поток событий из оператора.

@Override    public void processElement(TriggerMessage message,                               KeyedProcessFunction<String, TriggerMessage, AlertMessage>.Context ctx,                               Collector<AlertMessage> out) throws Exception {        final var status = message.getStatus();        if (START.equals(status)) {            // create timer            final var state = messageToStateTransformer.transform(message);            alertState.update(state);            final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();            final var previousTimestamp = timestampState.value();            if (previousTimestamp != null) {                ctx.timerService().deleteProcessingTimeTimer(previousTimestamp);            }            ctx.timerService().registerProcessingTimeTimer(invokeTimerMillis);            timestampState.update(invokeTimerMillis);        } else if (STOP.equals(status)) {            // remove timer            final var invokeTimerMillis = timestampState.value();            if (invokeTimerMillis != null) {                ctx.timerService().deleteProcessingTimeTimer(invokeTimerMillis);                timestampState.clear();            }            alertState.clear();        } else {            log.debug("Unknown trigger status {} for key {}", status, ctx.getCurrentKey());        }    }

Взглянем на внутренности метода более подробно:

  1. Определили статус события — START или STOP. Остальные типы к нам не придут: вспоминаем первый фильтр в нашем пайплайне. Текущий оператор знать об этом не должен, поэтому добавляем логирование.

  2. В случае START мы трансформируем пришедший триггер в объект состояния AlertState и сохраняем его в это состояние, а далее создаем сам таймер:

final var invokeTimerMillis = ctx.timerService().currentProcessingTime() + stateWaiting.toMillis();

Этой строкой берем текущее время машины, где выполняется задание Flink и прибавляем ему десять минут по бизнес-условию. В итоге получаем время срабатывания таймера. Если таймер для текущего ключа (trigger_name + user_id) уже есть, то удаляем его и просто создаем новый по вычисленному времени. Также запоминаем это время отдельно в состоянии timestampState.

  1. В случае STOP проверяем, существует ли в состоянии хоть какое-то время. Если оно есть, то мы создавали таймер в START блоке. Удаляем его и чистим все состояния для текущего ключа события.

Цель метода processElement — создавать таймеры и чистить состояния по бизнес-сценариям. Заметьте, что мы нигде в этом методе не добавляли события в выходной поток с помощью ссылки out. В выходной поток мы добавим события только при срабатывании таймера. За это отвечает метод onTimer():

@Override    public void onTimer(long timestamp,                        KeyedProcessFunction<String, TriggerMessage, AlertMessage>.OnTimerContext ctx,                        Collector<AlertMessage> out) throws Exception {        final var alertStateValue = alertState.value();        if (alertStateValue != null) {            final var alertMessage = stateToMessageTransformer.transform(alertStateValue);            out.collect(alertMessage);        }        timestampState.clear();        alertState.clear();    }

На входе он получает timestamp срабатывания таймера, все ту же ссылку на Flink, контекст и ссылку out. Взглянем на его реализацию подробнее:

  1. Сначала мы достаем состояние AlertState, которое было добавлено на этапе обработке события в processElement.

  2. Затем трансформируем его в формат выходного события AlertMessage.

  3. В конце очищаем все состояния, потому что они больше не нужны.

Собираем Flink Job

Чтобы гибко настраивать параметры джобы, вынесем длительность таймеров в настройки application.yml (с маппингом на класс TriggerToAlertJobProperties):

jobs:  trigger-to-alert-job:    enabled: true    state-waiting: 10m

Собрав все операторы вместе, получим такую имплементацию нашего класса FlinkJob:

@Component @AllArgsConstructor @ConditionalOnProperty("jobs.trigger-to-alert-job.enabled") public class TriggerToAlertJob extends FlinkJob {    private final TriggerToAlertJobProperties properties;    private final SourceBinder<TriggerMessage> sourceBinder;    private final SinkProvider<AlertMessage> sinkProvider;     @Override    public void registerJob(StreamExecutionEnvironment env) {        sourceBinder.bindSource(env)                .filter(new TriggerMessageByStatusAndUserFilter())                .uid("filter_trigger_message_by_status_id").name("filter_trigger_message_by_status")                .keyBy(new TriggerAlertProcessor.TriggerAlertProcessorKeySelector())                .process(new TriggerAlertProcessor(properties.getStateWaiting()))                .uid("trigger_alert_processor_id").name("trigger_alert_processor")                .sinkTo(sinkProvider.createSink()).uid("sink_alert_message_id").name("sink_alert_message");    } }

В качестве полей класса стандартно используем настройки для этой джобы, поля для получения Kafka Source и генерации Kafka Sink. В самой джобе по нашему начальному пайплайну определяем фильтр, оператор с таймерами и sink. Конечно, не забываем указать удобочитаемые имена и уникальный id для корректного восстановления состояния из savepoint.

В следующих статьях мы проверим работоспособность всего этого тестами. Используем E2E-тесты для основного сценария, unit-тесты для TriggerToAlertJob без Kafka и для отдельных операторов вместе с бизнес-сценариями генерации таймеров. Начнем со стандартных тестов, а далее перейдем к достаточно сложным и неочевидным проблемам, которые могут возникнуть в подобных Flink-джобах.


ссылка на оригинал статьи https://habr.com/ru/articles/839520/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *