Join таблиц в реальном времени на Apache Flink ( Часть 2 )

от автора

Доработка условия Inner Join

Ранее в примере в InnerJoinFunction мы отправляли данные дальше по потоку независимо от CRUD операции, которая была применена на нашу запись — мы просто при приходе новой записи из какой либо таблицы всегда отправляли новую запись в СП, но ведь если придет сообщение, уведомляющее об удалении записи, то необходимо будет запись удалить на СП и если взаимосвязь таблиц один ко многим, то придется удалить много записей.

Структура Debezium сообщения

Мы работаем с кафкой и принимаем сообщения, генерируемые Debezium в данном примере. Его структура приблизительно такая при простейшей конфигурации ( бОльшей нам и не надо ).

{ "op": "(c|r|u|d)", "source": { ... }, "ts_ms" : "...", "ts_us" : "...", "ts_ns" : "...", "before" : [Data, null], "after" : [Data, null] }

Где Data это модель данных.
В зависимости от операции "op" поля before и after принимают значение, описанное в модели данных либо null . Если вкратце, то при:

op = 'c' -> before = null, after = Data -> insert op = 'r' -> before = null, after = Data -> initial snapshot if set op = 'u' -> before = Data, after = Data -> update op = 'd' -> before = Data, after = null -> delete

Доработка маппера класса модели данных

public class Domain implements Serializable {   public Integer id;   public Integer user_id;   public String domain_name;   public boolean delete;    // getters and setters omitted    public static Domain fromRow(Row row) {         Character op = (Character) row.getField(1);          if (op == null) {             throw new IllegalStateException("Never should happen, if Debezium feels fine");         }          Row domain = (Row) row.getField(op == 'd' ? 0 : 2);          Integer id = (Integer) domain.getField(0);         Integer user_id = (Integer) domain.getField(1);         Integer domain_name = (Integer) domain.getField(2);          return new Domain(id, user_id, domain_name);     } }

При:
1. op = c или r берем то, что в after.
2. op = d берем всегда before.
3. op = u берем всегда тоже after, однако есть нюанс, о котором расскажу в следующей статье.

С моделью User по аналогии с Domain.

Доработка InnerJoinFunction

import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.*; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector;  import java.io.Serializable;  public class InnerJoinFunction extends KeyedCoProcessFunction<Integer, User, Domain, InnerJoinFunction.Output> {     private MapState<Integer, Domain> domainsState;     private ValueState<User> usersState;      @Override     public void processElement1(final User user, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {         usersState.update(user);                for (final Domain domain : domainsState.values()) {             out.collect(new InnerJoinFunction.Output(                     user.id,                     user.firstname,                     user.lastname,                     domain.domain_name,                     user.delete || domain.delete             ));         }     }      @Override     public void processElement2(final Domain domain, final Context ctx, final Collector<InnerJoinFunction.Output> out) throws Exception {         domainsState.put(domain.id, domain);          final User user = usersState.value();          if (user != null) {             out.collect(new InnerJoinFunction.Output(                     user.id,                     user.firstname,                     user.lastname,                     domain.domain_name,                     user.delete || domain.delete             ));         }     }      @Override     public void open(OpenContext openContext) throws Exception {         var usersStateDescriptor = new ValueStateDescriptor<>(                 "users",                 User.class         );         var domainsStateDescriptor = new MapStateDescriptor<>(                 "domains",                 Integer.class,                 Domain.class         );         usersState = getRuntimeContext().getState(usersStateDescriptor);         domainsState = getRuntimeContext().getMapState(domainsStateDescriptor);          super.open(openContext);     }      public static class Output implements Serializable {         public Integer user_id;         public String firstname;         public String lastname;         public String domain_name;         public boolean delete;                  // getters and setters omitted     } }

Такой подход будет успешно работать со всеми CRUD операциями и выполнять inner join условие.

Нюанс с update

Если ключ join изменяется ( любая из колонок одной из любой таблиц ), то данный подход работать не будет из за локальности хранения state движком Flink, в нашем примере ключом join является поле user_id в обеих таблицах и если в одной из них оно изменяется, то этот подход в при операциях update над user_id будет ломать логику.

В 3 части будет подробный разбор решения это проблемы.


ссылка на оригинал статьи https://habr.com/ru/articles/908220/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *