Не за горами выход релиза 4.0 Apache Kafka. Согласно графику релиза, 15 января состоится code freeze, а через пару недель или позже, после стабилизации, версия 4.0 увидит свет. Самое время присмотреться, что же в неё вошло.
Развитие Apache Kafka происходит в рамках процесса процесса работы с KIP’ами, Kafka Improvement Proposals. В релиз 4.0 их вошло 37 штук. Из них наиболее интересной мне показалась доработка, связанная с введением новой концепции, которую разработчики смело сравнили с очередями. Давайте посмотрим, что у них получилось. Остальные доработки, наверное, важны не менее.
Очереди!
«Очередям» в релизе посвящено 2 KIP’а:
Queues for Kafka |
Очереди в kafka |
|
Разработчики наконец признали важность полноценной реализации в Kafka сценария коллективной и конкурентной обработки потребителями событий, то есть без необходимости эксклюзивного назначения доступа к партиции топика. Для реализации данной функциональности, в дополнение к хорошо известным группам потребителей добавляются разделяемые группы (share group) — число потребителей в таких группах может превышать число партиций. Защита доступа к записи обеспечивается механизмом блокировки (по умолчанию на 30с, может быть переопределено свойством Для доступа к топикам посредством разделяемых групп вводится новый интерфейс |
||
Administration of groups |
Администрирование групп |
|
В продолжение предыдущего KIP’а, в связи с расширением номенклатуры типов групп, которое продолжится и далее (см. KIP-1071), был добавлен новый способ получения информации о группах посредством утилиты |
Как обычно, новая функциональность заявлена как экспериментальная, изменяющаяся и не рекомендуется к применению в продуктиве. Использование нового класса, в целом, аналогично старому.
Поддерживаются различные сценарии подтверждения, неявный:
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays. asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration. ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); doProcessing(record); } }
Альтернативно, можно использовать consumer.commitSync()
или consumer.commitAsync()
для передачи подтверждений, но это менее эффективно, поскольку приводит к дополнительным вызовам. Возможно также подтверждение на уровне отдельных записей:
Properties props = new Properties(); props.setProperty("bootstrap. servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays. asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try { doProcessing(record); consumer.acknowledge(record, AcknowledgeType.ACCEPT); } catch (Exception e) { consumer.acknowledge(record, AcknowledgeType.REJECT); break; } } consumer.commitSync(); }
Транзакционное поведение групп определяется свойством group.share.isolation.level
. Оно применяется ко всей группе, а не к отдельному потребителю. Свойство может принимать значение read_committed
или read_uncommitted
(по-умолчанию).
В рамках данного KIP также добавлена новая утилита командной строки kafka-console-share-consumer.sh
. Помимо вышеперечисленных, добавлено множество новых свойств, изменено несколько старых, появились новые API. К сожалению, новой -perf.sh утилиты пока нет.
В общем, это новая, обширная и интересная тема, которая заслуживает подробного разбора.
Что ещё интересного?
Вот ещё несколько улучшений релиза:
allow custom processor wrapping |
Возможность пользовательских расширений потоковых процессоров |
|
Добавлена возможность, наподобие аспектов, добавлять обёртку вокруг процессоров в Streams. Например, для добавления логирования или отладки. В поставке готовых содержательных врапперов нет, но идея интересная. Для включения опции надо задать конфигурацию |
||
Allow Foreign Key Extraction from Both Key and Value in KTable Joins |
Возможность join’ов на основе ключа и значения |
|
Утверждается, что таким образом в некоторых потоковых сценариях можно предотвратить оверхед. |
Add duration based offset reset option for consumer clients |
Добавление возможность указания сдвига на основе смещения по времени |
|
В релизе 3.6 была введена возможность долговременного (многоуровневого) хранения данных (KIP-405, см. подробно тут). С учётом, что теперь данные в кластере могут храниться годы, прежних возможностей по указанию смещения ( |
||
Make remote log manager thread-pool configs dynamic |
Динамическая настройка RemoteLogManager |
|
Если вы уже пользуетесь долговременным хранением, то вас заинтересует возможность налету менять настройки пула потоков RemoteLogManager, системного механизма, находящегося под капотом данной фичи. |
Allow disabling heartbeats replication in MirrorSourceConnector |
Возможность отключения репликации heartbeat-топиков в MirrorSourceConnector |
|
Данное улучшение закрывает потенциально возможную проблему, когда при настройке репликации MirrorMaker 2.0 вы создаёте несколько коннекторов (например, для топиков различными настройками сжатия). Дело в том, что ранее в таком случае каждый из коннекторов безусловно добавлял себе в обработку служебные heartbeat-топики. Теперь их можно явно исключить из репликации. |
||
Allow the replication of user internal topics |
Возможность репликации internal-топиков |
|
До релиза 4.0 топики, название которых заканчивалось на «internal» исключались из репликации. Как оказалось, так могут называться и пользовательские топики с данными, поэтому условие исключение было дополнено, и теперь нереплицируемые топики должны дополнительно начинаться на «mm2». |
||
KIP-724 удалил совместимость с клиентами и брокерами версии 2.1 и ниже. Обновляйтесь!
Также в релизе в модулях broker
и tools
удалена поддержка Java 11.
Важными с точки зрения совместимости кода являются обновления библиотек:
ссылка на оригинал статьи https://habr.com/ru/articles/871540/
Добавить комментарий