Удобная синхронизация настроек Kafka

от автора

Если вы настроили многоузловой кластер Kafka, то, вероятно, знаете, что в нем есть части конфигурации, общие для кластера, а есть уникальные для каждого узла.

В этой заметке я описываю свой способ проведения централизованного обновления конфигурации брокеров.

Поменяли на одном брокере — настройки применились везде.

Bourne again shell. Погнали!

Синхронизировать настройки будем при помощи rsync+ssh. Подключение к узлам будет под тем же служебным пользователем, под которым работает ПО Kafka.

Но перед синхронизацией необходимо разделить конфигурационный файл server.properties на две отдельные составляющие — server.properties.uniq и server.properties.common.

server.properties.uniq — настройки уникальные для каждого брокера.

server.properties.common — настройки общие для каждого брокера.

Полный конфигурационный файл брокера можно не читать, там много настроек, которые не являются предметом заметки, я его приведу в свёрнутом виде.

server.properties
############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.broker.id=1broker.rack=DC1advertised.listeners=SASL_SSL://yamaha1.bercut.com:9093advertised.host.name=yamaha1.bercut.comlisteners=SASL_SSL://yamaha1.bercut.com:9093#Kerberoslistener.name.sasl_ssl.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true \                              keyTab="/opt/kafka-secret/kafka-server-yamaha1.keytab" principal="KAFKA/yamaha1.bercut.com";############################# End of Server Basics ################################################### Zookeeper #################################zookeeper.connection.timeout.ms=18000zookeeper.connect=yamaha1.bercut.com:2182,yamaha2.bercut.com:2182,yamaha3.bercut.com:2182## Properties for SSL Zookeeper Security between Zookeeper and Brokerzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNettyzookeeper.ssl.client.enable=truezookeeper.ssl.protocol=TLSv1.2zookeeper.ssl.truststore.location=/opt/kafka-config/ssl/kafka/zookeeper-client.truststore.jkszookeeper.ssl.truststore.password=ChangeThisZKeySecretzookeeper.ssl.keystore.location=/opt/kafka-config/ssl/kafka/zookeeper-client.keystore.jkszookeeper.ssl.keystore.password=ChangeThisZKeySecretzookeeper.set.acl=true############################ SSL and SASL settings ############################### put this line if your certificate does not contain FQDN#ssl.endpoint.identification.algorithm=ssl.keystore.location=/opt/kafka-config/ssl/kafka/kafka.server.keystore.jksssl.keystore.password=ChangeThisKKeySecretssl.key.password=ChangeThisKKeySecretssl.truststore.location=/opt/kafka-config/ssl/kafka/kafka.server.truststore.jksssl.truststore.password=ChangeThisKKeySecretssl.protocol=TLSv1.2security.inter.broker.protocol=SASL_SSL##ssl.client.auth=requiredssl.client.auth=none##Properties for SASL beetween the brokers and clients#sasl.enabled.mechanisms=SCRAM-SHA-512sasl.enabled.mechanisms=SCRAM-SHA-512,GSSAPIsasl.kerberos.service.name=kafkasasl.mechanism.inter.broker.protocol=SCRAM-SHA-512listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="broker-admin" password="ChangeThisAdminSecret";super.users=User:broker-admin##Properties for Authorizationauthorizer.class.name=kafka.security.authorizer.AclAuthorizer# disable this for more security allow.everyone.if.no.acl.found=false############################# Socket Server Settings ############################## Maps listener names to security protocols, the default is for them to be the same. # See the config documentation for more details#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the networknum.network.threads=8# The number of threads that the server uses for processing requests, which may include disk I/Onum.io.threads=16# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log fileslog.dirs=/data/kafka-logs# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=2# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=2############################# Internal Topic Settings  ############################## The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=2offsets.topic.min.isr=2############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync# the OS cache lazily. The following configurations control the flush of data to disk.# There are a few important trade-offs here:#    1. Durability: Unflushed data may be lost if you are not using replication.#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.# The settings below allow one to configure the flush policy to flush data after a period of time or# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disklog.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flushlog.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age#7 days#log.retention.hours=168#4 dayslog.retention.ms=345600000# A size-based retention policy for logs. Segments are pruned from the log unless the remaining# segments drop below log.retention.bytes. Functions independently of log.retention.hours.#1GBlog.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created#1GBlog.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policies#5minlog.retention.check.interval.ms=300000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.# The default value for this is 3 seconds.# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.group.initial.rebalance.delay.ms=3000############################# General Topics Settings #############################delete.topic.enable=falseauto.create.topics.enable=falseconfig.storage.replication.factor=3replica.lag.time.max.ms=2000replica.fetch.wait.max.ms=200min.insync.replicas=2default.replication.factor=3unclean.leader.election.enable = falsenum.replica.fetchers=2############################ consumer local read connections priority ###################replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

Ключевой момент в конфигурации в том, что он разделен на тематические секции, а секция «Server Basics» скриптом автоматически отделится в server.properties.uniq.

Вот как можно сделать сплит. Перед сплитом на всякий случай сохраняем предыдущие версии файлов. Пишем splitcfg.sh.

#!/bin/bashcfg="server.properties"mydir=$(dirname "$0")cd "${mydir}"umask 026if [[ -f "${cfg}.uniq" ]]; then                 echo Saving old ${cfg}.uniq to ${cfg}.uniq.bak                mv ${cfg}.uniq ${cfg}.uniq.bakfiif [[ -f "${cfg}.common" ]]; then                echo Saving old ${cfg}.common to ${cfg}.common.bak                mv ${cfg}.common ${cfg}.common.bak fiuniqtxt=falsewhile read line; do        if [[ "$line" == "############################# Server Basics #############################" ]]; then                uniqtxt=true        fi        if [[ "$uniqtxt" == "true" ]]; then                echo "$line" >> ${cfg}.uniq        else                echo "$line" >> ${cfg}.common        fi        if [[ "$line" == "############################# End of Server Basics ######################" ]]; then                uniqtxt=false        fidone < <(cat "${cfg}")

Отлично, splitcfg.sh готов, также нам понадобится еще один простой скрипт, который склеивает конфигурацию обратно (mergecfg.sh):

#!/bin/bashcfg="server.properties"mydir=$(dirname "$0")cd "${mydir}"umask 026cat ${cfg}.uniq ${cfg}.common > $cfg

splitcfg.sh и mergecfg.sh раскладываем на все брокеры.

Теперь нужно сделать выборочную синхронизацию файлов с другими брокерами.

Комментарии по коду:

#!/bin/bash# Список всех брокеров. # Скрипт сам поймет, с кем нужно делать синхронизацию, а с кем нет (с собой не надо).CLUSTER_HOSTS=(10.1.1.1 10.1.1.2 10.1.1.3)# Пользователь, под которым будет идти синхронизация по rsync+sshkafkauser="kafka"# В моем случае бинарные файлы kafka лежат по пути  /opt/kafka_версия, # а симлинк /opt/kafka указывает на него (для простоты обновления версии).# Обределяем путь до папки kafka.kafkadir="$(readlink -f /opt/kafka)"# SSH ключ, под которым будет идти синхронизация по rsync+sshssh_key="/home/${kafkauser}/.ssh/id_ed25519"# Временная папка для синхронизации.tmp_dir="/tmp/kafka-sync"# Флажок интерективности. Скрипт синхронизации позволяет администратору сравнить# что поменялось перед перезаписью файлов.interactive="true"# Список папок для синхронизации.file_list="/opt/jmx_prometheus_javaagent/opt/kafka/opt/kafka-config${kafkadir}/bin${kafkadir}/config${kafkadir}/libs${kafkadir}/LICENSE${kafkadir}/licenses${kafkadir}/NOTICE${kafkadir}/site-docs"# Список исключений. # Не нужно синхронизировать уникальные SSL сертификаты брокеров Kafka# и узлов Zookeeper.# Не нужно синхронизировать server.properties, server.properties.uniq и его бэкап.exclude_file_list="${kafkadir}/logs/*/opt/kafka-config/ssl/opt/kafka-config/ssl/zookeeper/opt/kafka-config/ssl/kafka/opt/kafka-config/server.properties.uniq/opt/kafka-config/server.properties/opt/kafka-config/server.properties.uniq.bak "# Перед каждым элементом списка исключений добавить '--exclude=', # это необходимо по синтаксису rsync.exclude_file_list=$(echo "${exclude_file_list}" | sed 's|^| --exclude=|g')# Разделяем конфигурацию на уникальную и общую (splitcfg.sh приведен выше).echo "Spliting server.properties"/opt/kafka-config/splitcfg.sh# Определяем собственный IP адрес, формируем список хостов для синхронизации # HOSTS_TO_SYNC, не добавляя в него самого себя.my_ip=$(/sbin/ip add | sed -n '/127.0.0.1/d; s/.*inet \([0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\).*/\1/p;')for host in ${CLUSTER_HOSTS[@]}; do          [[ "$host" == "$my_ip" ]] && continue            HOSTS_TO_SYNC+=("${host}")    done# Для каждого remote_host в списке синхронизации HOSTS_TO_SYNCfor remote_host in ${HOSTS_TO_SYNC[@]}; do   # В режиме dry-run (без изменений) ищем файлы,    # которые отличаются на удаленной машине от файлов на локальной машине.   echo "Sync with ${remote_host}."   echo -n "Scanning difference of files... "   diff_files=$(   rsync --recursive \         --relative \         --itemize-changes  \         --dry-run \         --delete \         --links \         --checksum \         --group --owner --perms --no-times \         -e "ssh -i ${ssh_key}" \         ${exclude_file_list} \         ${file_list} \         ${kafkauser}@${remote_host}:/   )   echo -e "Done\n#######################################"   # Если найдены файлы, которые отличаются, то вывести подсказку для администратора,    # а затем список найденых отличий.   if [ -n "$(echo ${diff_files})" ] ; then      echo "       Description for string [<>][fd]cstpoguax:        <     transferred to the remote host.             c     different checksum        >     received to the local host.                 s     different size         .     the item is not being updated               T     modification time will be  set to the transfer time        f     file                                        p     different permissions         d     directory                                   o     different owner         L     symlink                                     g     different group                                                          a     ACL information changed                                                          x     the extended attribute information changed"      echo -e "#######################################\n\nList of file differences\n"        # Список отличающихся файлов      echo "${diff_files}" | sed '/^$/d'      # Скачиваем файлы на удаленной машине во временную локальную папку       # для проведения сравнения.      echo -en "\n#######################################\nMaking cache of remote files to compare. "      remote_file_list=$(echo "${diff_files}" | awk '{print $2}' | sed '/^ *$/d; /\/bin/d; s/^/'${kafkauser}@${remote_host}:'/')      echo -e "Downloading files to ${tmp_dir}:\n${remote_file_list}\n"      mkdir -p ${tmp_dir}       rsync -e "ssh -i ${ssh_key}" \            --recursive \            --relative \            --links \            $remote_file_list \            ${tmp_dir} 2>/dev/null      echo -e "\nDone\n#######################################"  # Для каждого файла смотрим отличия и выводим их для администратора      while read line; do           echo -e "\n\nComparing file\n${line}\n\n"           rfile=$(echo $line | awk '{print $2}')           echo "diff ${remote_host}:${rfile} ${rfile}"           diff ${tmp_dir}/${rfile} ${rfile}           echo -e "\n\n#######################################\n\n"      done < <(echo "${diff_files}")      # Удаляем временную локальную папку      echo "Removing Local ${tmp_dir}"      rm -r ${tmp_dir}  # Если установлен флаг интерактивности, подождать решения администратора,       # пока он визуально просмотрит планируемые изменения и подтвердит их введя "y".      # Для отказа - "n" или "Ctrl" + "C".      if [ "${interactive}" == "true" ]; then          while : ; do              echo -e "\nDo you wish sync changes?"               read LINE              echo "User answer ${LINE}"               [ "${LINE}" == "y" ] && break              [ "${LINE}" == "n" ] && exit 1          done      fi  # Сохранение уникальных настроек удаленного брокера      echo "Building remote server.properties.uniq ... "      ssh -i ${ssh_key} ${kafkauser}@${remote_host} "                 /opt/kafka-config/splitcfg.sh                  " && echo "Done"        # Синхронизация конфигурации с брокером      echo "Sync with ${remote_host}... "      rsync --recursive \            --relative \            --links \            --delete \            --checksum \            --group --owner --perms --no-times \            -e "ssh -i ${ssh_key}" \            ${exclude_file_list} \            ${file_list} \            ${kafkauser}@${remote_host}:/ \      && echo "Done"      # Запуск сбора конфигурации server.properties на удаленной машине      echo "Building remote server.properties ... "      ssh -i ${ssh_key} ${kafkauser}@${remote_host} "                 /opt/kafka-config/mergecfg.sh                  " && echo "Done"   else      echo -e "\n(No changes found)\n"   fidone

Скрипт выполняет замену конфигурации брокера и узла Zookeeper, однако не выполняет их перезапуск.

Перезапуск брокеров и Zookeeper нужно делать по очереди, контролируя синхронизацию данных в партициях, подключения клиентов к брокерам, состояние балансировок в консьюмер‑группах.

Можно добавить перезапуск в скрипт с задержками, однако я предпочитаю контролировать этот процесс вручную, заглядывая в логи, все‑таки прод. Это позволяет вовремя понять правильный момент для перезапуска, а также отловить потенциальные ошибки в конфигурации — если один брокер начал ругаться при рестарте, то это повод откатить его конфигурацию назад.

ссылка на оригинал статьи https://habr.com/ru/articles/947378/