Kafka как хранилище справочников

от автора

О чем статья

Хочу поделиться тем, как мы используем kafka для организации оперативного хранилища справочной информации.

Архитектура системы и требования

Сейчас я работаю на достаточно высоконагруженном проекте по обработке финансовых транзакций. Проект состоит из множества сервисов. Часть из которых условно можно выделить как транзакционное online flow — которое отвечает за то, чтобы надежно и быстро обработать поступающие на вход транзакции (списать оплату и выпустить чек). 

Центральное место в данной части системы занимает Kafka — входные данные обрабатываются, обогащаются и перекладываются из topic в topic и в конце концов обработанные транзакции должны быть сохранены в хранилище master data (postgres), по которым впоследствии уже выпускается какая-то отчетность, производятся сверки и другие взаимодействия с пользователями системы. 

Есть требование к online flow, что на обработку транзакций не должна влиять временная недоступность Postgres ( т.е. допустимы какие-то инфраструктурные работы по обновлению на 2-3 часа, в ходе которых БД будет недоступно). При этом есть допущение, что Kafka высокодоступная система. Т.е. при недоступности БД допустимо, что пользователи системы могут какое-то время не получать актуальные отчеты, но финансовые транзакции по списанию денег и выпуску чеков — должны продолжать работать при доступности внешних сервисов банков и Kafka.

Справочники

В сервисах требуется вести справочники, которые могут периодически обновляться службой поддержки. Справочники представляют из себя данные или настройки, которые требуются для обработки транзакций.

В данной статье как пример возьмем маленький справочник terminal_acquirer: каждому terminal_id соответствует acquirer_id. На основе этого маппинга один из сервисов принимает решение в каком банке проводить авторизацию.

В реальности справочники — это таблицы 5-10 полей размером несколько тысяч записей, которые могут иногда обновляться и дополняться.

Kafka как хранилище для справочников

Для службы сопровождения системы вести данные справочники удобно в реляционной БД( в нашем случае в Postgres). Удобно делать выборки, обновления, есть стандартные клиенты для взаимодействия, надежность хранения, ACID.

Чтобы покрыть описанное выше требование для online flow части, что сервисы должны “не заметить” временную недоступность Postgres применим подход описанный в conluent guides по построению streaming applications с той лишь разницей, что не будем использовать Kafka Streams.

Вкратце это будет выглядеть так:

  1. Редактируем данные справочника в Postgres

  2. Kafka Jdbc Source Connector выгружает обновления в Kafka topic

  3. Внутри сервиса  помещаем компонент, который вычитывает при старте все сообщения из Kafka topic и слушает его дальше,формируя in-memory структуру, к которой обращаются другие компоненты сервиса.

Проиллюстрируем это:

иллюстрация решения

иллюстрация решения

Kafka Connector

Про Kafka Connect Framework и JDBC Sink Connector можно почитать здесь.

Kafka JDBC Source Сonnector используется для того, чтобы периодически выкачивать обновления из БД в Kafka topic.

Необходимо правильно выбрать что будет использоваться в качестве record key — в большинстве случаев он должен совпадать с unique index в БД и быть неизменяемым.

В наших сервисах используется стандартный Kafka JDBC Source Connector c mode = timestamp + incrementing. Про стандартный Jdbc Source Connector, различные режимы и настройки можно почитать здесь. При таком режиме работы нужно завести в таблице служебное timestamp поле и служебное поле id с autoincrement. 

Проиллюстрируем как действия по вставке и удалению данных в таблицу будут отображаться в Kafka Topic:

Postgres

Kafka

offset

key

value

insert row1

0

key(row1)

value(row1)

insert row2

1

key(row2)

value(row2)

update row1

2

key(row1)

value(row1)

При этом важно чтобы key(row1) offset 0 == key(row1) offset 2. При этом value(row1) offset 0 != value(row1) offset 2

При таком подходе если удалить запись из БД, то это никак не отобразиться в Kafka Topic, т.к. стандартный Jdbc Source Connector по сути делает периодический select. Для того, чтобы можно было бы как-то выключать записи, а потом и удалять, можно ввести служебное поле isActive, сделать constraint который запрещает удалять данные с isActive = true, либо вообще их не удалять. Kafka connector может слать tombstone в случае isActive = false, либо можно слать значение как есть и отдать эту логику на откуп Consumer компоненту.

Можно также использовать debezium jdbc source connector, в котором как я понимаю не нужны такие ухищрения со служебными полями, т.к. он подключается к транзакционному логу masterа Postgres. Но в таком случае нужно обеспечить подключение коннектора к актуальной master node Postgres, что тоже может быть нетривиально.

Таким образом любой сервис, который слушает Kafka topic, может собрать у себя актуальную реплику справочника. Но нужно не забывать, чтобы key(row i) не изменялся. В таком случае мы можем применить к kafka topic retention policy compacted. Т.к. нас интересует только последний record по ключу пусть kafka удаляет старые данные для одинаковых ключей в соответствии с retention policy.

Вернемся к примеру с terminal_acquirer. Скрипты для создания справочника в БД со всеми необходимыми предусловиями, описанными выше:

CREATE TABLE termainal_acquirer ( id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY, terminal_id text NOT NULL, acquirer_id text NOT NULL, is_active bool NOT NULL DEFAULT true, updated_date timestamp NOT NULL DEFAULT timezone('utc'::text, CURRENT_TIMESTAMP), CONSTRAINT terminal_acquirer_pkey PRIMARY KEY (id) );  CREATE UNIQUE INDEX terminal_acquirer_unique_idx ON termainal_acquirer USING btree (terminal_id) WHERE (is_active IS TRUE);  -- Триггер, который предотвращает удаление из таблицы create trigger trigger_terminal_acquirer_delete before delete  on  termainal_acquirer for each row execute function fun_terminal_acquirer_delete();  – Триггер для обновления updated_date create trigger trigger_terminal_acquirer_update before update on  termainal_acquirer for each row execute function fun_terminal_acquirer_update();  –---Функция для обновления даты  CREATE OR REPLACE FUNCTION fun_terminal_acquirer_update()  RETURNS trigger  LANGUAGE plpgsql AS $function$             begin               if new.terminal_id <> old.terminal_id then                 raise exception 'terminal_id cannot be changed';               end if;               new.updated_date = current_timestamp at time zone 'utc';               return new;             end;             $function$ ; –---Функция для предупреждения удаления CREATE OR REPLACE FUNCTION fun_terminal_acquirer_delete()  RETURNS trigger  LANGUAGE plpgsql AS $function$             begin               if old.is_active <> false then                 raise exception 'Active settings (is_active = true) can not be deleted. Make it inactive firstly (is_active = false)';               end if;               return old;             end;       $function$; 

Конфигурация Kafka Connect Task, которая выгружает обновления из таблицы БД и складывает их в Kafka Topic в формате формате key = terminal_id и value = json(row):

{  "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",  "connection.url":"${tpp:reference-data.db.url}",  "connection.user": "${tpp:reference-data.db.user}",  "connection.password": "${tpp:reference-data.db.password}",  "dialect.name" : "PostgreSqlDatabaseDialect",    "key.converter": "org.apache.kafka.connect.storage.StringConverter",  "value.converter": "org.apache.kafka.connect.json.JsonConverter",  "value.converter.schemas.enable": "false",  "table.whitelist": "terminal_acquirer",  "topic.prefix": "${tpp:kafka.topic.prefix}",    "mode": "timestamp+incrementing",  "incrementing.column.name": "id",  "timestamp.column.name": "updated_date",    "transforms": "changeCase,copyId,extractId,updatedDateFormat",    "transforms.changeCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value",  "transforms.changeCase.from" : "LOWER_UNDERSCORE",  "transforms.changeCase.to" : "LOWER_CAMEL",    "transforms.copyId.type": "org.apache.kafka.connect.transforms.ValueToKey",  "transforms.copyId.fields": "id",    "transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",  "transforms.extractId.field": "id",   "transforms.updatedDateFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.updatedDateFormat.format": "yyyy-MM-dd HH:mm:ss.SSS", "transforms.updatedDateFormat.target.type": "string", "transforms.updatedDateFormat.field": "updatedDate" }

Проиллюстрируем соответствии событий в БД и Kafka:

Postgres

Kafka

offset

key

value

INSERT INTO termainal_acquirer (terminal_id, acquirer_id) VALUES(‘1’, ‘1’);

0

1

{

“id” : “1”,

“terminalId”:”1”,

“acquirerId”:”1”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:11.000”

}

INSERT INTO termainal_acquirer (terminal_id, acquirer_id) VALUES(‘2’, ‘2’);

1

2

{

“id” : “2”,

“terminalId”:”2”,

“acquirerId”:”2”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:12.000”

}

UPDATE termainal_acquirer SET acquirer_id=’3′ WHERE terminal_id = ‘1’

2

1

{

“id” : “1”,

“terminalId”:”1”,

“acquirerId”:”3”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:13.000”

}

Kafka Store Component

В Kafka Streams framework есть концепция Store, где Kafka topic выступает в качестве персистентного хранилища. В Kafka Streams есть разные реализации для работы с topic как хранилищем. Различают Global Store — хранилище, которое содержит все данные из всех партиций топика и простой State Store, который партиционирован, т.е. экземпляр приложения содержит данные в хранилище только из партиций, назначенных данному экземпляру клиента. В свою очередь есть различные реализации хранилищ по типу хранения данных, например это может быть In-Memory хранилище или хранилище, использующие какие-то embedded DB, например Rocks DB.

В наших сервисах, так исторически сложилось, для обработки сообщений используется реактивный клиент. В целом Kafka Streams накладывает определенные ограничения на параллелизацию обработки сообщений ( количество параллельных тасок зависит от количества партиций выделенных на топике и не может быть больше). Т.к. при обработке сообщений из Kafka topic нам требуется выполнять вызовы внешних сервисов, запросы к Cassandra, то реактивный подход обработки сообщений из топика батчами позволяет более эффективно использовать ресурсы.

Концепция с In-memory Global Store кажется очень подходит для эффективной и удобной работы со справочниками в наших сервисах. У нас есть некий Kafka topic, который хранит в себе выгрузку справочных данных из Postgres.

Реализуем данную концепцию без Kafka Streams. При старте приложения мы должны вычитать все сообщения, которые есть в топике-справочнике на момент старта, сформировав некую In-Memory структуру на стороне сервиса( это может быть ConcurrentHashMap). После этого, этого мы можем начать собственно обработку сообщений из Kafka, при этом должны продолжать слушать топик-справочник и обновлять нашу In-Memory структуру, чтобы поддерживать актуальной реплику справочника на стороне сервиса.

Приведу реализацию базового абстрактного класса с метриками, который вычитывает весь топик при старте приложения и затем слушает новые сообщения с абстрактным методом process, реализация которого должна отвечать за формирования справочника:

abstract class KafkaGlobalStore(     private val kafkaStoreProperties: KafkaProperties,     private val topicSource: List<String>,     private val metricSchedulePeriod: Duration = Duration.ofSeconds(5) ) {      private companion object : Log()      private val properties = Properties().apply {         setProperty("bootstrap.servers", kafkaStoreProperties.bootstrapServers)         setProperty("auto.offset.reset", "none")         setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")         setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")          if (kafkaStoreProperties.sslEnabled) {             setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")             setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaStoreProperties.truststorePath)             setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaStoreProperties.truststorePassword)             setProperty(                 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,                 kafkaStoreProperties.sslEndpointIdentificationAlgorithm             )         }     }      // KafkaConsumer is not safe for multithreaded access     private val consumer = KafkaConsumer<ByteArray?, ByteArray>(properties)     private val metricConsumer = KafkaConsumer<ByteArray?, ByteArray>(properties)      private lateinit var topicPartitions: List<TopicPartition>     private val positions = ConcurrentHashMap<TopicPartition, Long>()     private val errors = ConcurrentHashMap<Long, StoreError>()      protected abstract fun process(record: ConsumerRecord<ByteArray?, ByteArray>)      @PostConstruct     fun init() {         StoreMetrics.startInitTime(topicSource.toString())         topicPartitions = topicSource.flatMap { topic ->             consumer.partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }         }         val highWatermarkBarrier = CountDownLatch(1)          Thread {             val consumer = KafkaConsumer<ByteArray?, ByteArray>(properties)             consumer.assign(topicPartitions)             consumer.seekToBeginning(topicPartitions)              val highWatermarks = consumer.endOffsets(topicPartitions)              while (true) {                 if (highWatermarks.isNotEmpty()) {                     topicPartitions.forEach { partition ->                         highWatermarks[partition]                             ?.also { hwm ->                                 if (hwm <= consumer.position(partition)) {                                     highWatermarks.remove(partition)                                 }                             }                     }                     if (highWatermarks.isEmpty()) {                         highWatermarkBarrier.countDown()                     }                 }                  consumer.poll(Duration.ofMillis(kafkaStoreProperties.consumer.maxPollInterval.toLong()))                     .also { batch ->                         batch.partitions()                             .forEach { topicPartition ->                                 batch.records(topicPartition).forEach { record ->                                     kotlin.runCatching { process(record) }                                         .onFailure { ex ->                                             log.error(                                                 "Error processing record in store: key: ${record.key()} offset: ${record.offset()}",                                                 ex                                             )                                             errors[record.offset()] = ex.toError(record)                                             StoreMetrics.updateMetric(StoreMetrics.Error, Tags.empty(), errors.size.toLong())                                         }                                 }                                 val offset = consumer.position(topicPartition)                                 positions[topicPartition] = offset                                 StoreMetrics.updateMetric(StoreMetrics.Offset, topicPartition, offset)                             }                     }             }         }.start()         Timer("store-metric-thread", true).schedule(0L, metricSchedulePeriod.toMillis()) {             metricConsumer.endOffsets(topicPartitions).forEach { (topicPartition, value) ->                 StoreMetrics.updateMetric(StoreMetrics.Watermark, topicPartition, value)             }         }          highWatermarkBarrier.await()         StoreMetrics.finishInitTime(topicSource.toString())     }      fun endOffsets(): Map<TopicPartition, Long> = consumer.endOffsets(topicPartitions)     fun positions(): Map<TopicPartition, Long> = positions.toMap()     fun errors(): Map<Long, StoreError> = errors.toMap()      private fun Throwable.toError(record: ConsumerRecord<ByteArray?, ByteArray>) =         StoreError(record.offset(), record.key(), record.topic(), record.partition(), record.timestamp(), message, stackTraceToString()) } 

Продолжая наш примера с terminal_acquirer приведем пример реализации этого справочника:

class ReferenceDataStore(    kafkaProperties: KafkaProperties,    topicSource: List<String>,    val objectMapper: ObjectMapper ) : KafkaGlobalStore(kafkaProperties, topicSource) {      private val terminalAcquirerMap = ConcurrentHashMap<String,String>()    override fun process(record: ConsumerRecord<ByteArray?, ByteArray>) {        val referenceData = objectMapper.readReferenceData(record.value())        if (referenceData.isActive)            terminalAcquirerMap[referenceData.terminalId] = referenceData.acquirerId        else            terminalAcquirerMap.remove(referenceData.terminalId)    }       fun getAcquirerId(terminalId: String): String? = terminalAcquirerMap[terminalId] }

При такой реализации наш сервис содержит в себе полную актуальную реплику нашего справочника из БД terminal_acquirer в виде ConcurrentHashMap. Данная реализация позволяет эффективно работать со справочником, т.к. обращение происходит к обновляемой In-Memory структуре.

Потребляемые ресурсы и время загрузки

При таком подходе нужно аккуратно оценивать размер In-Memory структуры и ресурсы, необходимые для работы приложения с учетом особенностей работы с памятью JVM. Также необходимо оценивать время, которое необходимо на старте приложению, для того, чтобы вычитать, десериализовать и сложить в In-Memory структуру все данные из топика-справочника.

На данный момент все наши справочники небольшие и время их загрузки и потребляемые ресурсы не требуют серьезной калибровки настроек сервиса. Но необходимо учитывать и тестировать приложение на случай, если количество этих данных может кратно возрасти. 

Если говорить о приведенном примере со справочником terminal_acquirer, то я провел тест с наполнением справочника 50 000 значениями. Сервис был успешно запущен с параметрами: 

limits:    cpu: '2'    memory: 2000Mi  requests:    cpu: '2'    memory: 1000Mi

Время инициализации хранилища на старте( вычитка всех 50 000 значений в in memory) ~ 2510ms, что не так критично для spring boot service. В среднем в наших сервисах данных в справочниках сейчас на порядок меньше, но и при таком порядке время загрузки и потребляемые ресурсы выглядят для нас удовлетворительно.

Что делать, если все же появятся большие справочники

Сейчас наши справочники достаточно небольшие и в обозримом будущем не предполагается их увеличение на порядок. Поэтому подхода с In-Memory Global Store достаточно. Но что делать, если все же появятся кратно большие справочники?

Есть 2 проблемы в таком случае: 

  1. In-Memory хранилища недостаточно

  2. Вычитка и обработка всего топика занимает слишком много времени

В таком случае можно также подсмотреть подходы, которые используются в Kafka Streams. Например использовать вместо In-Memory структуры embedded DB. В Kafka Streams используется Rocks DB — быстрая key-value embedded DB. Использование Rocks DB позволит снизить потребления оперативной памяти, что решит проблему 1). Проблему 2) можно решить за счет подключения persistent volume, что позволит не вычитывать заново весь топик при рестарте сервиса,а читать только новые данные.

Заключение

Приведенный подход позволяет использовать в своих сервисах Kafka Topic как хранилище данных, не затягивая Kafka Streams Framework в стек, если по каким-то причинам этого делать не хочется. К тому же Kafka Streams имеет ряд ограничений, в том числе если нужна какая-то кастомизация хранилищ, то там на мой взгляд это делать неудобно. В данное реализации есть возможность более гибко организовывать хранилища.


ссылка на оригинал статьи https://habr.com/ru/articles/852092/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *