
Если вы настроили многоузловой кластер Kafka, то, вероятно, знаете, что в нем есть части конфигурации, общие для кластера, а есть уникальные для каждого узла.
В этой заметке я описываю свой способ проведения централизованного обновления конфигурации брокеров.
Поменяли на одном брокере — настройки применились везде.
Bourne again shell. Погнали!
Синхронизировать настройки будем при помощи rsync+ssh. Подключение к узлам будет под тем же служебным пользователем, под которым работает ПО Kafka.
Но перед синхронизацией необходимо разделить конфигурационный файл server.properties на две отдельные составляющие — server.properties.uniq и server.properties.common.
server.properties.uniq — настройки уникальные для каждого брокера.
server.properties.common — настройки общие для каждого брокера.
Полный конфигурационный файл брокера можно не читать, там много настроек, которые не являются предметом заметки, я его приведу в свёрнутом виде.
server.properties
############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.broker.id=1broker.rack=DC1advertised.listeners=SASL_SSL://yamaha1.bercut.com:9093advertised.host.name=yamaha1.bercut.comlisteners=SASL_SSL://yamaha1.bercut.com:9093#Kerberoslistener.name.sasl_ssl.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true \ keyTab="/opt/kafka-secret/kafka-server-yamaha1.keytab" principal="KAFKA/yamaha1.bercut.com";############################# End of Server Basics ################################################### Zookeeper #################################zookeeper.connection.timeout.ms=18000zookeeper.connect=yamaha1.bercut.com:2182,yamaha2.bercut.com:2182,yamaha3.bercut.com:2182## Properties for SSL Zookeeper Security between Zookeeper and Brokerzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNettyzookeeper.ssl.client.enable=truezookeeper.ssl.protocol=TLSv1.2zookeeper.ssl.truststore.location=/opt/kafka-config/ssl/kafka/zookeeper-client.truststore.jkszookeeper.ssl.truststore.password=ChangeThisZKeySecretzookeeper.ssl.keystore.location=/opt/kafka-config/ssl/kafka/zookeeper-client.keystore.jkszookeeper.ssl.keystore.password=ChangeThisZKeySecretzookeeper.set.acl=true############################ SSL and SASL settings ############################### put this line if your certificate does not contain FQDN#ssl.endpoint.identification.algorithm=ssl.keystore.location=/opt/kafka-config/ssl/kafka/kafka.server.keystore.jksssl.keystore.password=ChangeThisKKeySecretssl.key.password=ChangeThisKKeySecretssl.truststore.location=/opt/kafka-config/ssl/kafka/kafka.server.truststore.jksssl.truststore.password=ChangeThisKKeySecretssl.protocol=TLSv1.2security.inter.broker.protocol=SASL_SSL##ssl.client.auth=requiredssl.client.auth=none##Properties for SASL beetween the brokers and clients#sasl.enabled.mechanisms=SCRAM-SHA-512sasl.enabled.mechanisms=SCRAM-SHA-512,GSSAPIsasl.kerberos.service.name=kafkasasl.mechanism.inter.broker.protocol=SCRAM-SHA-512listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="broker-admin" password="ChangeThisAdminSecret";super.users=User:broker-admin##Properties for Authorizationauthorizer.class.name=kafka.security.authorizer.AclAuthorizer# disable this for more security allow.everyone.if.no.acl.found=false############################# Socket Server Settings ############################## Maps listener names to security protocols, the default is for them to be the same. # See the config documentation for more details#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the networknum.network.threads=8# The number of threads that the server uses for processing requests, which may include disk I/Onum.io.threads=16# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log fileslog.dirs=/data/kafka-logs# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=2# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=2############################# Internal Topic Settings ############################## The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=2offsets.topic.min.isr=2############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync# the OS cache lazily. The following configurations control the flush of data to disk.# There are a few important trade-offs here:# 1. Durability: Unflushed data may be lost if you are not using replication.# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.# The settings below allow one to configure the flush policy to flush data after a period of time or# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disklog.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flushlog.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age#7 days#log.retention.hours=168#4 dayslog.retention.ms=345600000# A size-based retention policy for logs. Segments are pruned from the log unless the remaining# segments drop below log.retention.bytes. Functions independently of log.retention.hours.#1GBlog.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created#1GBlog.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policies#5minlog.retention.check.interval.ms=300000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.# The default value for this is 3 seconds.# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.group.initial.rebalance.delay.ms=3000############################# General Topics Settings #############################delete.topic.enable=falseauto.create.topics.enable=falseconfig.storage.replication.factor=3replica.lag.time.max.ms=2000replica.fetch.wait.max.ms=200min.insync.replicas=2default.replication.factor=3unclean.leader.election.enable = falsenum.replica.fetchers=2############################ consumer local read connections priority ###################replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
Ключевой момент в конфигурации в том, что он разделен на тематические секции, а секция «Server Basics» скриптом автоматически отделится в server.properties.uniq.
Вот как можно сделать сплит. Перед сплитом на всякий случай сохраняем предыдущие версии файлов. Пишем splitcfg.sh.
#!/bin/bashcfg="server.properties"mydir=$(dirname "$0")cd "${mydir}"umask 026if [[ -f "${cfg}.uniq" ]]; then echo Saving old ${cfg}.uniq to ${cfg}.uniq.bak mv ${cfg}.uniq ${cfg}.uniq.bakfiif [[ -f "${cfg}.common" ]]; then echo Saving old ${cfg}.common to ${cfg}.common.bak mv ${cfg}.common ${cfg}.common.bak fiuniqtxt=falsewhile read line; do if [[ "$line" == "############################# Server Basics #############################" ]]; then uniqtxt=true fi if [[ "$uniqtxt" == "true" ]]; then echo "$line" >> ${cfg}.uniq else echo "$line" >> ${cfg}.common fi if [[ "$line" == "############################# End of Server Basics ######################" ]]; then uniqtxt=false fidone < <(cat "${cfg}")
Отлично, splitcfg.sh готов, также нам понадобится еще один простой скрипт, который склеивает конфигурацию обратно (mergecfg.sh):
#!/bin/bashcfg="server.properties"mydir=$(dirname "$0")cd "${mydir}"umask 026cat ${cfg}.uniq ${cfg}.common > $cfg
splitcfg.sh и mergecfg.sh раскладываем на все брокеры.
Теперь нужно сделать выборочную синхронизацию файлов с другими брокерами.
Комментарии по коду:
#!/bin/bash# Список всех брокеров. # Скрипт сам поймет, с кем нужно делать синхронизацию, а с кем нет (с собой не надо).CLUSTER_HOSTS=(10.1.1.1 10.1.1.2 10.1.1.3)# Пользователь, под которым будет идти синхронизация по rsync+sshkafkauser="kafka"# В моем случае бинарные файлы kafka лежат по пути /opt/kafka_версия, # а симлинк /opt/kafka указывает на него (для простоты обновления версии).# Обределяем путь до папки kafka.kafkadir="$(readlink -f /opt/kafka)"# SSH ключ, под которым будет идти синхронизация по rsync+sshssh_key="/home/${kafkauser}/.ssh/id_ed25519"# Временная папка для синхронизации.tmp_dir="/tmp/kafka-sync"# Флажок интерективности. Скрипт синхронизации позволяет администратору сравнить# что поменялось перед перезаписью файлов.interactive="true"# Список папок для синхронизации.file_list="/opt/jmx_prometheus_javaagent/opt/kafka/opt/kafka-config${kafkadir}/bin${kafkadir}/config${kafkadir}/libs${kafkadir}/LICENSE${kafkadir}/licenses${kafkadir}/NOTICE${kafkadir}/site-docs"# Список исключений. # Не нужно синхронизировать уникальные SSL сертификаты брокеров Kafka# и узлов Zookeeper.# Не нужно синхронизировать server.properties, server.properties.uniq и его бэкап.exclude_file_list="${kafkadir}/logs/*/opt/kafka-config/ssl/opt/kafka-config/ssl/zookeeper/opt/kafka-config/ssl/kafka/opt/kafka-config/server.properties.uniq/opt/kafka-config/server.properties/opt/kafka-config/server.properties.uniq.bak "# Перед каждым элементом списка исключений добавить '--exclude=', # это необходимо по синтаксису rsync.exclude_file_list=$(echo "${exclude_file_list}" | sed 's|^| --exclude=|g')# Разделяем конфигурацию на уникальную и общую (splitcfg.sh приведен выше).echo "Spliting server.properties"/opt/kafka-config/splitcfg.sh# Определяем собственный IP адрес, формируем список хостов для синхронизации # HOSTS_TO_SYNC, не добавляя в него самого себя.my_ip=$(/sbin/ip add | sed -n '/127.0.0.1/d; s/.*inet \([0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\).*/\1/p;')for host in ${CLUSTER_HOSTS[@]}; do [[ "$host" == "$my_ip" ]] && continue HOSTS_TO_SYNC+=("${host}") done# Для каждого remote_host в списке синхронизации HOSTS_TO_SYNCfor remote_host in ${HOSTS_TO_SYNC[@]}; do # В режиме dry-run (без изменений) ищем файлы, # которые отличаются на удаленной машине от файлов на локальной машине. echo "Sync with ${remote_host}." echo -n "Scanning difference of files... " diff_files=$( rsync --recursive \ --relative \ --itemize-changes \ --dry-run \ --delete \ --links \ --checksum \ --group --owner --perms --no-times \ -e "ssh -i ${ssh_key}" \ ${exclude_file_list} \ ${file_list} \ ${kafkauser}@${remote_host}:/ ) echo -e "Done\n#######################################" # Если найдены файлы, которые отличаются, то вывести подсказку для администратора, # а затем список найденых отличий. if [ -n "$(echo ${diff_files})" ] ; then echo " Description for string [<>][fd]cstpoguax: < transferred to the remote host. c different checksum > received to the local host. s different size . the item is not being updated T modification time will be set to the transfer time f file p different permissions d directory o different owner L symlink g different group a ACL information changed x the extended attribute information changed" echo -e "#######################################\n\nList of file differences\n" # Список отличающихся файлов echo "${diff_files}" | sed '/^$/d' # Скачиваем файлы на удаленной машине во временную локальную папку # для проведения сравнения. echo -en "\n#######################################\nMaking cache of remote files to compare. " remote_file_list=$(echo "${diff_files}" | awk '{print $2}' | sed '/^ *$/d; /\/bin/d; s/^/'${kafkauser}@${remote_host}:'/') echo -e "Downloading files to ${tmp_dir}:\n${remote_file_list}\n" mkdir -p ${tmp_dir} rsync -e "ssh -i ${ssh_key}" \ --recursive \ --relative \ --links \ $remote_file_list \ ${tmp_dir} 2>/dev/null echo -e "\nDone\n#######################################" # Для каждого файла смотрим отличия и выводим их для администратора while read line; do echo -e "\n\nComparing file\n${line}\n\n" rfile=$(echo $line | awk '{print $2}') echo "diff ${remote_host}:${rfile} ${rfile}" diff ${tmp_dir}/${rfile} ${rfile} echo -e "\n\n#######################################\n\n" done < <(echo "${diff_files}") # Удаляем временную локальную папку echo "Removing Local ${tmp_dir}" rm -r ${tmp_dir} # Если установлен флаг интерактивности, подождать решения администратора, # пока он визуально просмотрит планируемые изменения и подтвердит их введя "y". # Для отказа - "n" или "Ctrl" + "C". if [ "${interactive}" == "true" ]; then while : ; do echo -e "\nDo you wish sync changes?" read LINE echo "User answer ${LINE}" [ "${LINE}" == "y" ] && break [ "${LINE}" == "n" ] && exit 1 done fi # Сохранение уникальных настроек удаленного брокера echo "Building remote server.properties.uniq ... " ssh -i ${ssh_key} ${kafkauser}@${remote_host} " /opt/kafka-config/splitcfg.sh " && echo "Done" # Синхронизация конфигурации с брокером echo "Sync with ${remote_host}... " rsync --recursive \ --relative \ --links \ --delete \ --checksum \ --group --owner --perms --no-times \ -e "ssh -i ${ssh_key}" \ ${exclude_file_list} \ ${file_list} \ ${kafkauser}@${remote_host}:/ \ && echo "Done" # Запуск сбора конфигурации server.properties на удаленной машине echo "Building remote server.properties ... " ssh -i ${ssh_key} ${kafkauser}@${remote_host} " /opt/kafka-config/mergecfg.sh " && echo "Done" else echo -e "\n(No changes found)\n" fidone
Скрипт выполняет замену конфигурации брокера и узла Zookeeper, однако не выполняет их перезапуск.
Перезапуск брокеров и Zookeeper нужно делать по очереди, контролируя синхронизацию данных в партициях, подключения клиентов к брокерам, состояние балансировок в консьюмер‑группах.
Можно добавить перезапуск в скрипт с задержками, однако я предпочитаю контролировать этот процесс вручную, заглядывая в логи, все‑таки прод. Это позволяет вовремя понять правильный момент для перезапуска, а также отловить потенциальные ошибки в конфигурации — если один брокер начал ругаться при рестарте, то это повод откатить его конфигурацию назад.
ссылка на оригинал статьи https://habr.com/ru/articles/947378/