Join таблиц в реальном времени на Apache Flink ( Часть 3 )

от автора

Архитектура Apache Flink (бегло)

Скорее всего читатели знают, но контекста ради.
Flink имеет децентрализованный дизайн с распределенной архитектурой, где набор контейнеров ( Task Manager ) несут ответственность за свою локальную зону или не несут???
Эта зона в случае разбиения через keyBy размазывается исходя из хэша заданного ключа, посредством чего, Flink гарантирует попадание всех событий с этим ключом на тот же контейнер и также гарантирует сохранение того порядка событий, в котором они доехали до оператора, и, как следствие этот контейнер является мастером-владельцем ключа — отсюда понятие локальной зоны становится оправданным, ведь все хэш пространство размазывается на контейнеры равномерно. Однако, если не использовать keyBy и просто попробовать вызвать что угодно, то будет round-robin распределение и тогда снимается понятие локальной зоны ответственности.

Проблема нюанса с update.

Если в таблице domains поле user_id изменяемое, то давайте представим такой пример:
1. Было domains.id = 1, domains.user_id = 1
2. Происходит обновление и меняется domains.id = 1, domains.user_id = 2
3. Мы получаем в потоке одно событие domains.id = 1, domains.user_id = 2, delete = false
4. Это событие ( с какой то вероятностью ) попадает на другой контейнер, мэтчится с пользователем user.id = 2, заполняет state этим событием и отправляется дальше по потоку обновляя СП.
Но! Событие или уже объект, связанный с domains.id = 1, domains.user_id = 1 остается в state на другом контейнере и хотелось бы, чтобы события могли за собой убирать.

Второй пример:
1. Было domains.id = 1, domains.user_id = 1
2. Происходит обновление и меняется domains.id = 2, domains.user_id = 2 — да вот так вот сразу обновился первичный ключ и пользак сразу!
3. Мы получаем в потоке одно событие domains.id = 2, domains.user_id = 2, delete = false
4. Это событие ( с какой то вероятностью ) попадает на другой контейнер, мэтчится с пользователем user.id = 2, заполняет state этим событием и отправляется дальше по потоку обновляя СП неправильно! СП дедуплицируется по domains.id и новое событие domains.id = 2 до него дойдет, но старое domains.id = 1 не удалится. В добавок мы все также забыли почистить за собой.

Решение нюанса с update.

Оказывается, что есть серебрянная пуля решающее все.
Достаточно во время прихода событий update делить их на delete + insert.
В CDC update событии согласно Debezium протоколу сообщение имеет заполненные before и after.
Итоговое решение для Domain

javapublic class Domain implements Serializable {  public Integer id;  public Integer user_id;  public String domain_name;  public boolean delete;  // getters and setters omitted  public static Domain[] fromRow(Row row) {        Character op = (Character) row.getField(1);        if (op == null) {            throw new IllegalStateException("Never should happen, if Debezium feels fine");        }        if (op == 'd') {            Row before = (Row) row.getField(0);                return new Domain[]{              new Domain(before.getField(0), before.getField(1), before.getField(2), true)            };        } else if (op == 'u') {            Row before = (Row) row.getField(0);            Row after = (Row) row.getField(2);                return new Domain[]{              new Domain(before.getField(0), before.getField(1), before.getField(2), true),              new Domain(after.getField(0), after.getField(1), after.getField(2), false)            };        } else {            Row after = (Row) row.getField(2);                return new Domain[]{              new Domain(after.getField(0), after.getField(1), after.getField(2), false)            };        }    }}

Обратите внимание на порядок — сначала delete и потом insert. Далее оно в таком же порядке доходит до Join оператора из-за гарантий keyBy и помечается удаленным в state ( при желании можно удалять ) и отправляется для старого domain.id = 1 в СП с пометкой удаления. В случае insert + delete свежим событием было бы удаление на всех уровнях.

Итого:

  1. Имеем realtime поток, умеющий работать со всеми CRUD операциями.

  2. State не имеющий избыточности.

  3. Умеем работать с изменяемым ключом джоина.

  4. Умеем работать с изменяемым первичным ключом на СИ.

Следующие шаги:

  1. Выстроить хронологию событий — что если сначала придет в кафку второе событие и потом первое?

  2. СП — как правильно выбрать?

Вещаю об NRT-RT

Вещаю об NRT-RT

ссылка на оригинал статьи https://habr.com/ru/articles/1022368/