В данной статье приведу реализацию своего kafka http sink connector. Он не претендует на универсальность, но возможно поможет разобраться как разработать свой connector.
Confluent Http Sink Connector — платный, другие варианты с github мне не подошли. Про Kafka Connect можно почитать здесь. Статья предполагает, что есть понимание того зачем нужен Kafka Connect Framework и как его использовать. Представленный код написан на Kotlin.
Для начала зададим Schema для нашего коннектора:
val HTTP_REQUEST_SCHEMA: Schema = SchemaBuilder.struct() .name(HTTP_REQUEST_SCHEMA_NAME) .field(FIELD_HTTP_METHOD, Schema.STRING_SCHEMA) .field( FIELD_HTTP_HEADERS, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build() ) .field(FIELD_HTTP_BODY, Schema.OPTIONAL_STRING_SCHEMA) .field(FIELD_HTTP_URL, Schema.STRING_SCHEMA) .build()
На вход коннектора должна обязательно поступать Struct со Schema = HTTP_REQUEST_SCHEMA, т.е. сообщения из Kafka с помощью converter и transforms должны быть приведены к Struct со схемой HTTP_REQUEST_SCHEMA.
Для реализации коннектора будем использовать стандартный java.net.http.HttpClient. Класс конфигурации для коннектора будет выглядеть так:
class HttpSinkConfig(private val props: Map<String, String>) : AbstractConfig(configDef, props) { companion object { private const val RESPONSE_VALIDATOR_CLASS_NAME = "response.validator" private const val CONNECTION_TIMEOUT_MS = "connectionTimeoutMs" val configDef = ConfigDef() .define( RESPONSE_VALIDATOR_CLASS_NAME, ConfigDef.Type.STRING, HttpSuccessStatusResponseValidator::class.java.name, ConfigDef.Importance.HIGH, "Class name of validator" ) .define( CONNECTION_TIMEOUT_MS, ConfigDef.Type.LONG, 2000, ConfigDef.Importance.HIGH, "Http connection timeout in ms" ) } fun responseValidator(): HttpResponseValidator = (Class.forName(getString(RESPONSE_VALIDATOR_CLASS_NAME)).getDeclaredConstructor().newInstance() as HttpResponseValidator) .apply { init(props) } fun httpClient(): HttpClient = HttpClient.newBuilder() .connectTimeout(Duration.ofMillis(connectionTimeoutMs())) .build() private fun connectionTimeoutMs(): Long = getLong(CONNECTION_TIMEOUT_MS) }
В данной реализации создается дефолтный HttpClient с одной лишь настройкой, но реализацию конфигурации можно расширить, добавив ssl свойства и другие специфичные свойства, которые необходимо добавить для настройки HttpClient. Для этого по аналогии с connectionTimeoutMs нужно объявить их в ConfigDef.
responseValidator будет использоваться для валидации ответов, т.е. будет возможность задать валидатор, определяет какие ответы считать успешными, а какие ошибочными. Определим интерфейс для этих целей:
interface HttpResponseValidator : (HttpResponse<String>) -> Unit { @Throws(RetriableException::class) override fun invoke(response: HttpResponse<String>) fun init(props: Map<String, String>) }
Приведем реализацию, которая классифицирует ответы относительно http response code. Задается 3 типа http response code:
-
successCodes — трактуем ответ, как успешный
-
retryCodes — трактуем ответ как временно ошибочный, при котором выполняется переотправка
-
errorCodes — трактуем ответ как ошибочный, дальнейшее поведение зависит от настройки таски “error.tolerance”: all — обработка продолжается, none — обработка сообщений завершается
class StatusResponseValidator : HttpResponseValidator { private lateinit var retryCodes: List<Int> private lateinit var successCodes: List<Int> private lateinit var errorCodes: List<Int> override fun invoke(response: HttpResponse<String>) { if (response.statusCode() !in successCodes) { if (response.statusCode() in retryCodes || retryCodes.isEmpty() && response.statusCode() !in errorCodes) throw RetriableException("Status $response.statusCode() is not success $successCodes") else throw IllegalArgumentException("Status $response.statusCode() is not success $successCodes") } } override fun init(props: Map<String, String>) { val config = AbstractConfig( ConfigDef() .define( RETRY_CODES, ConfigDef.Type.LIST, listOf<String>(), ConfigDef.Importance.LOW, "Http response codes for retry" ) .define( SUCCESS_CODES, ConfigDef.Type.LIST, listOf("200"), ConfigDef.Importance.HIGH, "Success http response codes" ) .define( ERROR_CODES, ConfigDef.Type.LIST, listOf<String>(), ConfigDef.Importance.HIGH, "Error http response codes" ), props ) retryCodes = config.getList(RETRY_CODES).map { it.toInt() } successCodes = config.getList(SUCCESS_CODES).map { it.toInt() } errorCodes = config.getList(ERROR_CODES).map { it.toInt() } } companion object { private const val RETRY_CODES = "response.validator.codes.retry" private const val SUCCESS_CODES = "response.validator.codes.success" private const val ERROR_CODES = "response.validator.codes.error" } }
Пример того, сконфигурировать response validator в настройках таски:
{ .... "response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator", "response.validator.codes.success": "200", "response.validator.codes.error": "400", .... }
Самая простая реализация таски с HttpClient будет выглядеть так:
class HttpSinkTask : SinkTask() { companion object { private val log = LoggerFactory.getLogger(this::class.java) } protected lateinit var config: HttpSinkConfig protected lateinit var httpClient: HttpClient protected lateinit var responseValidator: HttpResponseValidator override fun version(): String = "1.0" override fun stop() {} override fun flush(currentOffsets: MutableMap<TopicPartition, OffsetAndMetadata>) { } override fun start(props: Map<String, String>) { log.info("Starting http sink task...") config = HttpSinkConfig(props) httpClient = config.httpClient() responseValidator = config.responseValidator() } override fun put(records: Collection<SinkRecord>) { if (records.isEmpty()) return log.debug( "Received {} records. First record kafka coordinates:({}-{}-{}).", records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset() ) records.forEach { record -> record.toHttpRequestSafe() ?.let { request -> request.send() .also { response -> log.trace("Http request: {}, Http response: {}", request, response) response.validate(record) } } } } private fun HttpRequest.send() = try { httpClient.send(this, HttpResponse.BodyHandlers.ofString()) } catch (ex: Exception) { // Сообщения будут переобработаны через backoffTimeoutMs log.error("Error during sending http request: $this", ex) log.info("Context timeout before retry") context.timeout(config.backoffTimeoutMs()) log.info("Throw exception after context timeout") throw RetriableException(ex) } protected fun HttpResponse<String>.validate(record: SinkRecord) { try { responseValidator(this) } catch (ex: RetriableException) { // Сообщения будут переобработаны через backoffTimeoutMs log.info("Context timeout before retry") context.timeout(config.backoffTimeoutMs()) log.info("Throw exception after timeout") throw ex } catch (ex: Exception) { log.error("Matching response failed", ex) // Поведение зависит от настройки таски errors.tolerance: // * Обработка завершается, если "errors.tolerance": "none" // * Обработка продолжается, а сообщения отправляются в dead letter, // если "errors.tolerance": "all" context.errantRecordReporter().report(record, ex) } } protected fun SinkRecord.toHttpRequestSafe() = try { toHttpRequest() } catch (ex: Exception) { log.error("Invalid record", ex) // Поведение зависит от настройки таски errors.tolerance: // * Обработка завершается, если "errors.tolerance": "none" // * Обработка продолжается, а сообщения отправляются в dead letter, // если "errors.tolerance": "all" context.errantRecordReporter().report(this, ex) null } protected fun SinkRecord.toHttpRequest() = (value() as Struct) .let { httpStruct -> HttpRequest.newBuilder() .uri(URI.create(httpStruct.getHttpUrl())) .method( httpStruct.getHttpMethod(), httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody() ) .apply { httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) } } .build() } }
Реализация класса коннектора:
class HttpSinkConnector : SinkConnector() { private val log = LoggerFactory.getLogger(HttpSinkConnector::class.java) private lateinit var settings: Map<String, String> override fun version(): String = "1.0" override fun start(props: MutableMap<String, String>) { log.info("Starting HttpSyncSinkConnector...") settings = props } override fun taskClass(): Class<out Task> = HabrHttpSinkTask::class.java override fun taskConfigs(maxTasks: Int): List<Map<String, String>> = List(maxTasks) { settings } override fun stop() {} override fun config(): ConfigDef = HttpSinkConfig.configDef override fun validate(connectorConfigs: Map<String, String>): Config { return super.validate(connectorConfigs) } }
Была приведена простая реализация коннектора, где сообщения будут обрабатываться строго одно за одним. Попробуем немного оптимизировать данную реализацию за счет использования асинхронного метода HttpClient.sendAsync
вместо синхронного HttpClient.send
. Идея в том, чтобы посылать несколько запросов параллельно, обрабатывая сообщения пачками. Такой подход в некоторых случаях может быть более оптимальным, если это предусмотрено реализацией Http Server.
Чтобы иметь возможность конфигурировать максимальное количество параллельных запросов приведем интерфейс и реализацию SinkRecordGrouper
, который делит список входящих сообщений на входе метода SinkTask.put
на подсписки, которые в свою очередь обрабатываются строго последовательно, при этом элементы подсписков обрабатываются параллельно:
interface SinkRecordGrouper : (List<SinkRecord>) -> List<List<SinkRecord>> { override fun invoke(records: List<SinkRecord>): List<List<SinkRecord>> fun init(props: Map<String, String>) }
Приведем реализацию, которая кроме собственно разбивки на подсписки по количеству элементов, еще и группирует их по ключу, т.е. элементы с одинаковыми ключами помещаются в разные подсписки, чтобы они не обрабатывались параллельно, а строго последовательно, в некоторых случаях такая логика необходима.
class KeyGrouper : SinkRecordGrouper { private var parallelCount by Delegates.notNull<Long>() override fun invoke(records: List<SinkRecord>): List<List<SinkRecord>> { val result = mutableListOf<List<SinkRecord>>() val batch = mutableListOf(*records.toTypedArray()) while (batch.isNotEmpty()) { val keySet = mutableSetOf<Any>() val subResult = mutableListOf<SinkRecord>() for (r in batch) { if (r.key() !in keySet) { subResult.add(r) keySet.add(r.key()) } if (subResult.size >= parallelCount) break } batch.removeAll(subResult) result.add(subResult) } return result } override fun init(props: Map<String, String>) { val config = AbstractConfig( ConfigDef() .define( PARALLEL_COUNT, ConfigDef.Type.LONG, 5, ConfigDef.Importance.LOW, "How many requests to send in parallel" ), props ) parallelCount = config.getLong(PARALLEL_COUNT) } companion object { private const val PARALLEL_COUNT = "grouper.parallelCount" } }
Приведем обновленную реализацию HttpSinkTask
с использованием HttpClient.sendAsync
и KeyGrouper
:
class HttpSinkTask : SinkTask() { companion object { private val log = LoggerFactory.getLogger(this::class.java) } private lateinit var config: HttpSinkConfig private lateinit var httpClient: HttpClient private lateinit var responseValidator: HttpResponseValidator private lateinit var grouper: SinkRecordGrouper override fun version(): String = "1.0" override fun stop() {} override fun flush(currentOffsets: MutableMap<TopicPartition, OffsetAndMetadata>) { } override fun start(props: Map<String, String>) { log.info("Starting http sink task...") config = HttpSinkConfig(props) httpClient = config.httpClient() responseValidator = config.responseValidator() grouper = KeyGrouper().apply { init(props) } } override fun put(records: Collection<SinkRecord>) { if (records.isEmpty()) return log.debug( "Received {} records. First record kafka coordinates:({}-{}-{}).", records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset() ) grouper(records.toList()).forEach { subRecords -> subRecords.map { record -> record.toHttpRequestSafe() ?.let { request -> request.sendAsync() ?.exceptionally { ex -> // Батч будет переобработан через backoffTimeoutMs log.error("Error handling response, http request: $this", ex) log.info("Context timeout before retry") context.timeout(config.backoffTimeoutMs()) log.info("Throw exception after context timeout") throw RetriableException(ex) } ?.thenApply { response -> response.validate(record) } } } .forEach { try { it?.join() } catch (ex: CompletionException) { ex.cause?.let { throw it } } } } } private fun HttpRequest.sendAsync() = try { httpClient.sendAsync(this, HttpResponse.BodyHandlers.ofString()) } catch (ex: Exception) { // Батч будет переработа после backoffTimeoutMs log.error("Error during sending http request: $this", ex) log.info("Context timeout before retry") context.timeout(config.backoffTimeoutMs()) log.info("Throw exception after context timeout") throw RetriableException(ex) } protected fun HttpResponse<String>.validate(record: SinkRecord) { try { responseValidator(this) } catch (ex: RetriableException) { // Сообщения будут переобработаны через backoffTimeoutMs log.info("Context timeout before retry") context.timeout(config.backoffTimeoutMs()) log.info("Throw exception after timeout") throw ex } catch (ex: Exception) { log.error("Matching response failed", ex) // Поведение зависит от настройки таски errors.tolerance: // * Обработка завершается, если "errors.tolerance": "none" // * Обработка продолжается, а сообщения отправляются в dead letter, // если "errors.tolerance": "all" ) context.errantRecordReporter().report(record, ex) } } protected fun SinkRecord.toHttpRequestSafe() = try { toHttpRequest() } catch (ex: Exception) { log.error("Invalid record", ex) // report a record processing error to the context // depending on the error handling strategy settings: // * processing is stopped ( "errors.tolerance": "none" ) // * processing is continued and this record is sent to the dead letter ( "errors.tolerance": "all" ) context.errantRecordReporter().report(this, ex) null } protected fun SinkRecord.toHttpRequest() = (value() as Struct) .let { httpStruct -> HttpRequest.newBuilder() .uri(URI.create(httpStruct.getHttpUrl())) .method( httpStruct.getHttpMethod(), httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody() ) .apply { httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) } } .build() } }
Как может выглядеть конфигурация таски:
{ "connector.class": "ru.typik.kafka.connect.HttpAsyncSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "ru.typik.kafka.connect.converter.ProtobufConverter", "value.converter.protoClassName": "ru.typik.debt.proto.NotificationModel$NotificationData", "consumer.override.group.id": "${tpp:consumer-group}", "auto.create": "false", "tasks.max": "1", "topics": "${tpp:topic}", "errors.tolerance": "all", "errors.log.enable": true, "errors.log.include.messages": true, "errors.deadletterqueue.topic.name": "${tpp:deadLetter}", "errors.deadletterqueue.topic.replication.factor": "${tpp:replication-factor}", "errors.deadletterqueue.context.headers.enable": true, "transforms": "http", "transforms.http.type": "ru.typik.HttpTransform", "grouper.parallelCount": "50", "backoffTimeoutMs": "${tpp:backoffTimeoutMs}", "response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator", "response.validator.codes.success": "200", "response.validator.codes.error": "400" }
Как может выглядеть реализация HttpTransform
:
class HttpTransform<R : ConnectRecord<R>> : Transformation<R> { override fun close() {} protected fun Struct.getMethod(): String = “GET” protected fun Struct.getUrl(): String = “http://localhost:8080” protected fun Struct.getBody(): String? = “{}” protected fun Struct.getHeaders(): Map<String, String>? = mapOf( "Content-Type" to "application/json", "Accept" to "application/json" ) override fun apply(record: R): R = record.newRecord( record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), HTTP_REQUEST_SCHEMA, (record.value() as Struct).let { struct -> Struct(HTTP_REQUEST_SCHEMA) .put(FIELD_HTTP_METHOD, struct.getMethod()) .put(FIELD_HTTP_URL, struct.getUrl()) .put(FIELD_HTTP_BODY, struct.getBody()) .put(FIELD_HTTP_HEADERS, struct.getHeaders()) }, record.timestamp() ) }
Я потестировал данную реализацию коннектора с различными значениями parallelCount
и построил графики в Graphana с использованием стандартных метрик Kafka Connect. В качестве Http Server взял wire mock, который отвечает с небольшой задержкой.
Конфигурация wiremock:
{ "mappings": [ { "priority": 1, "request": { "method": "GET", "urlPattern": "/" }, "response": { "status": 200, "fixedDelayMilliseconds": 200, "headers": { "Content-Type": "application/json" } } } ] }
Получившиеся графики:
Это достаточно условный сценарий тестирования, все зависит от реализации Http Server, будет ли выхлоп от оптимизации с параллельными запросами. Возможно для кого-то подойдет вариант с оптимизацией, которая позволяет группировать http запросы в батчи и слать их одним запросом, но в этом случае Http Server тоже должен быть готов к такой специфичной логике обработки.
ссылка на оригинал статьи https://habr.com/ru/articles/851916/
Добавить комментарий