Архитектура Apache Flink (бегло)
Скорее всего читатели знают, но контекста ради.
Flink имеет децентрализованный дизайн с распределенной архитектурой, где набор контейнеров ( Task Manager ) несут ответственность за свою локальную зону или не несут???
Эта зона в случае разбиения через keyBy размазывается исходя из хэша заданного ключа, посредством чего, Flink гарантирует попадание всех событий с этим ключом на тот же контейнер и также гарантирует сохранение того порядка событий, в котором они доехали до оператора, и, как следствие этот контейнер является мастером-владельцем ключа — отсюда понятие локальной зоны становится оправданным, ведь все хэш пространство размазывается на контейнеры равномерно. Однако, если не использовать keyBy и просто попробовать вызвать что угодно, то будет round-robin распределение и тогда снимается понятие локальной зоны ответственности.
Проблема нюанса с update.
Если в таблице domains поле user_id изменяемое, то давайте представим такой пример:
1. Было domains.id = 1, domains.user_id = 1
2. Происходит обновление и меняется domains.id = 1, domains.user_id = 2
3. Мы получаем в потоке одно событие domains.id = 1, domains.user_id = 2, delete = false
4. Это событие ( с какой то вероятностью ) попадает на другой контейнер, мэтчится с пользователем user.id = 2, заполняет state этим событием и отправляется дальше по потоку обновляя СП.
Но! Событие или уже объект, связанный с domains.id = 1, domains.user_id = 1 остается в state на другом контейнере и хотелось бы, чтобы события могли за собой убирать.
Второй пример:
1. Было domains.id = 1, domains.user_id = 1
2. Происходит обновление и меняется domains.id = 2, domains.user_id = 2 — да вот так вот сразу обновился первичный ключ и пользак сразу!
3. Мы получаем в потоке одно событие domains.id = 2, domains.user_id = 2, delete = false
4. Это событие ( с какой то вероятностью ) попадает на другой контейнер, мэтчится с пользователем user.id = 2, заполняет state этим событием и отправляется дальше по потоку обновляя СП неправильно! СП дедуплицируется по domains.id и новое событие domains.id = 2 до него дойдет, но старое domains.id = 1 не удалится. В добавок мы все также забыли почистить за собой.
Решение нюанса с update.
Оказывается, что есть серебрянная пуля решающее все.
Достаточно во время прихода событий update делить их на delete + insert.
В CDC update событии согласно Debezium протоколу сообщение имеет заполненные before и after.
Итоговое решение для Domain
javapublic class Domain implements Serializable { public Integer id; public Integer user_id; public String domain_name; public boolean delete; // getters and setters omitted public static Domain[] fromRow(Row row) { Character op = (Character) row.getField(1); if (op == null) { throw new IllegalStateException("Never should happen, if Debezium feels fine"); } if (op == 'd') { Row before = (Row) row.getField(0); return new Domain[]{ new Domain(before.getField(0), before.getField(1), before.getField(2), true) }; } else if (op == 'u') { Row before = (Row) row.getField(0); Row after = (Row) row.getField(2); return new Domain[]{ new Domain(before.getField(0), before.getField(1), before.getField(2), true), new Domain(after.getField(0), after.getField(1), after.getField(2), false) }; } else { Row after = (Row) row.getField(2); return new Domain[]{ new Domain(after.getField(0), after.getField(1), after.getField(2), false) }; } }}
Обратите внимание на порядок — сначала delete и потом insert. Далее оно в таком же порядке доходит до Join оператора из-за гарантий keyBy и помечается удаленным в state ( при желании можно удалять ) и отправляется для старого domain.id = 1 в СП с пометкой удаления. В случае insert + delete свежим событием было бы удаление на всех уровнях.
Итого:
-
Имеем realtime поток, умеющий работать со всеми CRUD операциями.
-
State не имеющий избыточности.
-
Умеем работать с изменяемым ключом джоина.
-
Умеем работать с изменяемым первичным ключом на СИ.
Следующие шаги:
-
Выстроить хронологию событий — что если сначала придет в кафку второе событие и потом первое?
-
СП — как правильно выбрать?
ссылка на оригинал статьи https://habr.com/ru/articles/1022368/