Пишем свой Http Kafka Sink Connector

от автора

В данной статье приведу реализацию своего kafka http sink connector. Он не претендует на универсальность, но возможно поможет разобраться как разработать свой connector.

Confluent Http Sink Connector — платный, другие варианты с github мне не подошли. Про Kafka Connect можно почитать здесь. Статья предполагает, что есть понимание того зачем нужен Kafka Connect Framework и как его использовать. Представленный код написан на Kotlin.

Для начала зададим Schema для нашего коннектора:

val HTTP_REQUEST_SCHEMA: Schema = SchemaBuilder.struct()    .name(HTTP_REQUEST_SCHEMA_NAME)    .field(FIELD_HTTP_METHOD, Schema.STRING_SCHEMA)    .field(        FIELD_HTTP_HEADERS,        SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()    )    .field(FIELD_HTTP_BODY, Schema.OPTIONAL_STRING_SCHEMA)    .field(FIELD_HTTP_URL, Schema.STRING_SCHEMA)    .build() 

На вход коннектора должна обязательно поступать Struct со Schema = HTTP_REQUEST_SCHEMA, т.е. сообщения из Kafka с помощью converter и transforms должны быть приведены к Struct со схемой HTTP_REQUEST_SCHEMA.

Для реализации коннектора будем использовать стандартный java.net.http.HttpClient. Класс конфигурации для коннектора будет выглядеть так:

class HttpSinkConfig(private val props: Map<String, String>) : AbstractConfig(configDef, props) {     companion object {        private const val RESPONSE_VALIDATOR_CLASS_NAME = "response.validator"        private const val CONNECTION_TIMEOUT_MS = "connectionTimeoutMs"         val configDef = ConfigDef()            .define(                RESPONSE_VALIDATOR_CLASS_NAME,                ConfigDef.Type.STRING,                HttpSuccessStatusResponseValidator::class.java.name,                ConfigDef.Importance.HIGH,                "Class name of validator"            )            .define(                CONNECTION_TIMEOUT_MS,                ConfigDef.Type.LONG,                2000,                ConfigDef.Importance.HIGH,                "Http connection timeout in ms"            )    }     fun responseValidator(): HttpResponseValidator =       (Class.forName(getString(RESPONSE_VALIDATOR_CLASS_NAME)).getDeclaredConstructor().newInstance() as HttpResponseValidator)            .apply { init(props) }       fun httpClient(): HttpClient =        HttpClient.newBuilder()            .connectTimeout(Duration.ofMillis(connectionTimeoutMs()))            .build()      private fun connectionTimeoutMs(): Long = getLong(CONNECTION_TIMEOUT_MS) }

В данной реализации создается дефолтный HttpClient с одной лишь настройкой, но реализацию конфигурации можно расширить, добавив ssl свойства и другие специфичные свойства, которые необходимо добавить для настройки HttpClient. Для этого по аналогии с connectionTimeoutMs нужно объявить их в ConfigDef. 

responseValidator будет использоваться для валидации ответов, т.е. будет возможность задать валидатор, определяет какие ответы считать успешными, а какие ошибочными. Определим интерфейс для этих целей:

interface HttpResponseValidator : (HttpResponse<String>) -> Unit {    @Throws(RetriableException::class)    override fun invoke(response: HttpResponse<String>)    fun init(props: Map<String, String>) }

Приведем реализацию, которая классифицирует ответы относительно http response code. Задается 3 типа http response code:

  1. successCodes — трактуем ответ, как успешный

  2. retryCodes — трактуем ответ как временно ошибочный, при котором выполняется переотправка

  3. errorCodes — трактуем ответ как ошибочный, дальнейшее поведение зависит от настройки таски “error.tolerance”: all — обработка продолжается, none — обработка сообщений завершается

class StatusResponseValidator : HttpResponseValidator {    private lateinit var retryCodes: List<Int>    private lateinit var successCodes: List<Int>    private lateinit var errorCodes: List<Int>      override fun invoke(response: HttpResponse<String>) {        if (response.statusCode() !in successCodes) {            if (response.statusCode() in retryCodes || retryCodes.isEmpty() && response.statusCode() !in errorCodes)                throw RetriableException("Status $response.statusCode() is not success $successCodes")            else throw IllegalArgumentException("Status $response.statusCode() is not success $successCodes")        }    }     override fun init(props: Map<String, String>) {        val config = AbstractConfig(            ConfigDef()                .define(                    RETRY_CODES,                    ConfigDef.Type.LIST,                    listOf<String>(),                    ConfigDef.Importance.LOW,                    "Http response codes for retry"                )                .define(                    SUCCESS_CODES,                    ConfigDef.Type.LIST,                    listOf("200"),                    ConfigDef.Importance.HIGH,                    "Success http response codes"                )                .define(                    ERROR_CODES,                    ConfigDef.Type.LIST,                    listOf<String>(),                    ConfigDef.Importance.HIGH,                    "Error http response codes"                ),            props        )        retryCodes = config.getList(RETRY_CODES).map { it.toInt() }        successCodes = config.getList(SUCCESS_CODES).map { it.toInt() }        errorCodes = config.getList(ERROR_CODES).map { it.toInt() }    }     companion object {        private const val RETRY_CODES = "response.validator.codes.retry"        private const val SUCCESS_CODES = "response.validator.codes.success"        private const val ERROR_CODES = "response.validator.codes.error"    } } 

Пример того, сконфигурировать response validator в настройках таски:

{   .... "response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator", "response.validator.codes.success": "200", "response.validator.codes.error": "400",   .... }

Самая простая реализация таски с HttpClient будет выглядеть так:

class HttpSinkTask : SinkTask() {     companion object {        private val log = LoggerFactory.getLogger(this::class.java)    }     protected lateinit var config: HttpSinkConfig    protected lateinit var httpClient: HttpClient    protected lateinit var responseValidator: HttpResponseValidator     override fun version(): String = "1.0"    override fun stop() {}    override fun flush(currentOffsets: MutableMap<TopicPartition, OffsetAndMetadata>) { }     override fun start(props: Map<String, String>) {        log.info("Starting http sink task...")        config = HttpSinkConfig(props)        httpClient = config.httpClient()        responseValidator = config.responseValidator()    }     override fun put(records: Collection<SinkRecord>) {        if (records.isEmpty()) return         log.debug(            "Received {} records. First record kafka coordinates:({}-{}-{}).",            records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()        )         records.forEach { record ->                record.toHttpRequestSafe()                    ?.let { request ->                        request.send()                            .also { response ->                                log.trace("Http request: {}, Http response: {}", request, response)                                response.validate(record)                            }                    }        }    }     private fun HttpRequest.send() = try {        httpClient.send(this, HttpResponse.BodyHandlers.ofString())    } catch (ex: Exception) {        // Сообщения будут переобработаны через backoffTimeoutMs        log.error("Error during sending http request: $this", ex)        log.info("Context timeout before retry")        context.timeout(config.backoffTimeoutMs())        log.info("Throw exception after context timeout")        throw RetriableException(ex)    }     protected fun HttpResponse<String>.validate(record: SinkRecord) {        try {            responseValidator(this)        } catch (ex: RetriableException) {            // Сообщения будут переобработаны через backoffTimeoutMs            log.info("Context timeout before retry")            context.timeout(config.backoffTimeoutMs())            log.info("Throw exception after timeout")            throw ex        } catch (ex: Exception) {            log.error("Matching response failed", ex)            // Поведение зависит от настройки таски errors.tolerance:            // * Обработка завершается, если "errors.tolerance": "none"            // * Обработка продолжается, а сообщения отправляются в dead letter,            //    если "errors.tolerance": "all"             context.errantRecordReporter().report(record, ex)        }    }     protected fun SinkRecord.toHttpRequestSafe() = try {        toHttpRequest()    } catch (ex: Exception) {        log.error("Invalid record", ex)        // Поведение зависит от настройки таски errors.tolerance:            // * Обработка завершается, если "errors.tolerance": "none"            // * Обработка продолжается, а сообщения отправляются в dead letter,            //    если "errors.tolerance": "all"         context.errantRecordReporter().report(this, ex)        null    }     protected fun SinkRecord.toHttpRequest() =        (value() as Struct)            .let { httpStruct ->                HttpRequest.newBuilder()                    .uri(URI.create(httpStruct.getHttpUrl()))                    .method(                        httpStruct.getHttpMethod(),                        httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()                    )                    .apply {                        httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) }                    }                    .build()            } } 

Реализация класса коннектора:

class HttpSinkConnector : SinkConnector() {     private val log = LoggerFactory.getLogger(HttpSinkConnector::class.java)    private lateinit var settings: Map<String, String>     override fun version(): String = "1.0"     override fun start(props: MutableMap<String, String>) {        log.info("Starting HttpSyncSinkConnector...")        settings = props    }     override fun taskClass(): Class<out Task> = HabrHttpSinkTask::class.java    override fun taskConfigs(maxTasks: Int): List<Map<String, String>> =        List(maxTasks) { settings }    override fun stop() {}    override fun config(): ConfigDef = HttpSinkConfig.configDef     override fun validate(connectorConfigs: Map<String, String>): Config {        return super.validate(connectorConfigs)    } } 

Была приведена простая реализация коннектора, где сообщения будут обрабатываться строго одно за одним. Попробуем немного оптимизировать данную реализацию за счет использования асинхронного метода HttpClient.sendAsync вместо синхронного HttpClient.send. Идея в том, чтобы посылать несколько запросов параллельно, обрабатывая сообщения пачками. Такой подход в некоторых случаях может быть более оптимальным, если это предусмотрено реализацией Http Server.

Чтобы иметь возможность конфигурировать максимальное количество параллельных запросов приведем интерфейс и реализацию SinkRecordGrouper, который делит список входящих сообщений на входе метода SinkTask.put на подсписки, которые в свою очередь обрабатываются строго последовательно, при этом элементы подсписков обрабатываются параллельно:

interface SinkRecordGrouper : (List<SinkRecord>) -> List<List<SinkRecord>> {    override fun invoke(records: List<SinkRecord>): List<List<SinkRecord>>    fun init(props: Map<String, String>) }

Приведем реализацию, которая кроме собственно разбивки на подсписки по количеству элементов, еще и группирует их по ключу, т.е. элементы с одинаковыми ключами помещаются в разные подсписки, чтобы они не обрабатывались параллельно, а строго последовательно, в некоторых случаях такая логика необходима.

class KeyGrouper : SinkRecordGrouper {     private var parallelCount by Delegates.notNull<Long>()     override fun invoke(records: List<SinkRecord>): List<List<SinkRecord>> {        val result = mutableListOf<List<SinkRecord>>()        val batch = mutableListOf(*records.toTypedArray())         while (batch.isNotEmpty()) {            val keySet = mutableSetOf<Any>()            val subResult = mutableListOf<SinkRecord>()            for (r in batch) {                if (r.key() !in keySet) {                    subResult.add(r)                    keySet.add(r.key())                }                 if (subResult.size >= parallelCount)                    break            }            batch.removeAll(subResult)            result.add(subResult)        }        return result    }      override fun init(props: Map<String, String>) {        val config = AbstractConfig(            ConfigDef()                .define(                    PARALLEL_COUNT,                    ConfigDef.Type.LONG,                    5,                    ConfigDef.Importance.LOW,                    "How many requests to send in parallel"                ),            props        )        parallelCount = config.getLong(PARALLEL_COUNT)    }     companion object {        private const val PARALLEL_COUNT = "grouper.parallelCount"    } } 

Приведем обновленную реализацию HttpSinkTask с использованием HttpClient.sendAsync и KeyGrouper:

class HttpSinkTask : SinkTask() {     companion object {        private val log = LoggerFactory.getLogger(this::class.java)    }     private lateinit var config: HttpSinkConfig    private lateinit var httpClient: HttpClient    private lateinit var responseValidator: HttpResponseValidator    private lateinit var grouper: SinkRecordGrouper       override fun version(): String = "1.0"    override fun stop() {}    override fun flush(currentOffsets: MutableMap<TopicPartition, OffsetAndMetadata>) {    }     override fun start(props: Map<String, String>) {        log.info("Starting http sink task...")        config = HttpSinkConfig(props)        httpClient = config.httpClient()        responseValidator = config.responseValidator()        grouper = KeyGrouper().apply { init(props) }    }      override fun put(records: Collection<SinkRecord>) {        if (records.isEmpty()) return          log.debug(            "Received {} records. First record kafka coordinates:({}-{}-{}).",            records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()        )               grouper(records.toList()).forEach { subRecords ->            subRecords.map { record ->                record.toHttpRequestSafe()                    ?.let { request ->                        request.sendAsync()                            ?.exceptionally { ex ->                                // Батч будет переобработан через backoffTimeoutMs                                log.error("Error handling response, http request: $this", ex)                                log.info("Context timeout before retry")                                context.timeout(config.backoffTimeoutMs())                                log.info("Throw exception after context timeout")                                throw RetriableException(ex)                              }                            ?.thenApply { response -> response.validate(record) }                    }            }                .forEach {                    try {                        it?.join()                    } catch (ex: CompletionException) {                        ex.cause?.let { throw it }                    }                }        }    }      private fun HttpRequest.sendAsync() = try {        httpClient.sendAsync(this, HttpResponse.BodyHandlers.ofString())    } catch (ex: Exception) {        // Батч будет переработа после backoffTimeoutMs        log.error("Error during sending http request: $this", ex)        log.info("Context timeout before retry")        context.timeout(config.backoffTimeoutMs())        log.info("Throw exception after context timeout")        throw RetriableException(ex)    }     protected fun HttpResponse<String>.validate(record: SinkRecord) {        try {            responseValidator(this)        } catch (ex: RetriableException) {            // Сообщения будут переобработаны через backoffTimeoutMs            log.info("Context timeout before retry")            context.timeout(config.backoffTimeoutMs())            log.info("Throw exception after timeout")            throw ex        } catch (ex: Exception) {            log.error("Matching response failed", ex)            // Поведение зависит от настройки таски errors.tolerance:            // * Обработка завершается, если "errors.tolerance": "none"            // * Обработка продолжается, а сообщения отправляются в dead letter,            //    если "errors.tolerance": "all" )            context.errantRecordReporter().report(record, ex)        }    }     protected fun SinkRecord.toHttpRequestSafe() = try {        toHttpRequest()    } catch (ex: Exception) {        log.error("Invalid record", ex)        // report a record processing error to the context        // depending on the error handling strategy settings:        // * processing is stopped ( "errors.tolerance": "none" )        // * processing is continued and this record is sent to the dead letter ( "errors.tolerance": "all" )        context.errantRecordReporter().report(this, ex)        null    }     protected fun SinkRecord.toHttpRequest() =        (value() as Struct)            .let { httpStruct ->                HttpRequest.newBuilder()                    .uri(URI.create(httpStruct.getHttpUrl()))                    .method(                        httpStruct.getHttpMethod(),                        httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()                    )                    .apply {                        httpStruct.getHttpHeaders()?.forEach { (k, v) -> header(k, v) }                    }                    .build()            } } 

Как может выглядеть конфигурация таски:

{  "connector.class": "ru.typik.kafka.connect.HttpAsyncSinkConnector",  "key.converter": "org.apache.kafka.connect.storage.StringConverter",  "value.converter": "ru.typik.kafka.connect.converter.ProtobufConverter",  "value.converter.protoClassName": "ru.typik.debt.proto.NotificationModel$NotificationData",  "consumer.override.group.id": "${tpp:consumer-group}",  "auto.create": "false",  "tasks.max": "1",  "topics": "${tpp:topic}",  "errors.tolerance": "all",  "errors.log.enable": true,  "errors.log.include.messages": true,  "errors.deadletterqueue.topic.name": "${tpp:deadLetter}",  "errors.deadletterqueue.topic.replication.factor": "${tpp:replication-factor}",  "errors.deadletterqueue.context.headers.enable": true,  "transforms": "http",  "transforms.http.type": "ru.typik.HttpTransform",  "grouper.parallelCount": "50",  "backoffTimeoutMs": "${tpp:backoffTimeoutMs}",  "response.validator": "ru.typik.kafka.connect.task.StatusResponseValidator",  "response.validator.codes.success": "200",  "response.validator.codes.error": "400" } 

Как может выглядеть реализация HttpTransform:

class HttpTransform<R : ConnectRecord<R>> : Transformation<R> {     override fun close() {}     protected fun Struct.getMethod(): String = “GET”    protected fun Struct.getUrl(): String = “http://localhost:8080”    protected fun Struct.getBody(): String? = “{}”    protected fun Struct.getHeaders(): Map<String, String>? = mapOf(        "Content-Type" to "application/json",        "Accept" to "application/json"    )     override fun apply(record: R): R =        record.newRecord(            record.topic(),            record.kafkaPartition(),            record.keySchema(),            record.key(),            HTTP_REQUEST_SCHEMA,            (record.value() as Struct).let { struct ->                Struct(HTTP_REQUEST_SCHEMA)                    .put(FIELD_HTTP_METHOD, struct.getMethod())                    .put(FIELD_HTTP_URL, struct.getUrl())                    .put(FIELD_HTTP_BODY, struct.getBody())                    .put(FIELD_HTTP_HEADERS, struct.getHeaders())            },            record.timestamp()        ) }

Я потестировал данную реализацию коннектора с различными значениями parallelCount и построил графики в Graphana с использованием стандартных метрик Kafka Connect. В качестве Http Server взял wire mock, который отвечает с небольшой задержкой. 

Конфигурация wiremock:

{  "mappings": [    {      "priority": 1,      "request": {        "method": "GET",        "urlPattern": "/"      },      "response": {        "status": 200,        "fixedDelayMilliseconds": 200,        "headers": {          "Content-Type": "application/json"        }      }    }  ] }

Получившиеся графики:

Нагрузочное тестирование

Нагрузочное тестирование

Это достаточно условный сценарий тестирования, все зависит от реализации Http Server, будет ли выхлоп от оптимизации с параллельными запросами. Возможно для кого-то подойдет вариант с оптимизацией, которая позволяет группировать http запросы в батчи и слать их одним запросом, но в этом случае Http Server тоже должен быть готов к такой специфичной логике обработки.


ссылка на оригинал статьи https://habr.com/ru/articles/851916/