{"id":436012,"date":"2024-10-19T15:01:29","date_gmt":"2024-10-19T15:01:29","guid":{"rendered":"http:\/\/savepearlharbor.com\/?p=436012"},"modified":"-0001-11-30T00:00:00","modified_gmt":"-0001-11-29T21:00:00","slug":"","status":"publish","type":"post","link":"https:\/\/savepearlharbor.com\/?p=436012","title":{"rendered":"<span>\u041f\u0438\u0448\u0435\u043c \u0441\u0432\u043e\u0439 Http Kafka Sink Connector<\/span>"},"content":{"rendered":"<div><!--[--><!--]--><\/div>\n<div id=\"post-content-body\">\n<div>\n<div class=\"article-formatted-body article-formatted-body article-formatted-body_version-2\">\n<div xmlns=\"http:\/\/www.w3.org\/1999\/xhtml\">\n<p>\u0412 \u0434\u0430\u043d\u043d\u043e\u0439 \u0441\u0442\u0430\u0442\u044c\u0435 \u043f\u0440\u0438\u0432\u0435\u0434\u0443 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0441\u0432\u043e\u0435\u0433\u043e kafka http sink connector. \u041e\u043d \u043d\u0435 \u043f\u0440\u0435\u0442\u0435\u043d\u0434\u0443\u0435\u0442 \u043d\u0430 \u0443\u043d\u0438\u0432\u0435\u0440\u0441\u0430\u043b\u044c\u043d\u043e\u0441\u0442\u044c, \u043d\u043e \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e \u043f\u043e\u043c\u043e\u0436\u0435\u0442 \u0440\u0430\u0437\u043e\u0431\u0440\u0430\u0442\u044c\u0441\u044f \u043a\u0430\u043a \u0440\u0430\u0437\u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c \u0441\u0432\u043e\u0439 connector.<\/p>\n<p>Confluent Http Sink Connector &#8212; \u043f\u043b\u0430\u0442\u043d\u044b\u0439, \u0434\u0440\u0443\u0433\u0438\u0435 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u044b \u0441 github \u043c\u043d\u0435 \u043d\u0435 \u043f\u043e\u0434\u043e\u0448\u043b\u0438. \u041f\u0440\u043e Kafka Connect \u043c\u043e\u0436\u043d\u043e \u043f\u043e\u0447\u0438\u0442\u0430\u0442\u044c <a href=\"https:\/\/docs.confluent.io\/platform\/current\/connect\/userguide.html\" rel=\"noopener noreferrer nofollow\">\u0437\u0434\u0435\u0441\u044c<\/a>.\u00a0\u0421\u0442\u0430\u0442\u044c\u044f \u043f\u0440\u0435\u0434\u043f\u043e\u043b\u0430\u0433\u0430\u0435\u0442, \u0447\u0442\u043e \u0435\u0441\u0442\u044c \u043f\u043e\u043d\u0438\u043c\u0430\u043d\u0438\u0435 \u0442\u043e\u0433\u043e \u0437\u0430\u0447\u0435\u043c \u043d\u0443\u0436\u0435\u043d Kafka Connect Framework \u0438 \u043a\u0430\u043a \u0435\u0433\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c. \u041f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u0435\u043d\u043d\u044b\u0439 \u043a\u043e\u0434 \u043d\u0430\u043f\u0438\u0441\u0430\u043d \u043d\u0430 Kotlin.<\/p>\n<p>\u0414\u043b\u044f \u043d\u0430\u0447\u0430\u043b\u0430 \u0437\u0430\u0434\u0430\u0434\u0438\u043c Schema \u0434\u043b\u044f \u043d\u0430\u0448\u0435\u0433\u043e \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430:  <\/p>\n<pre><code class=\"kotlin\">val HTTP_REQUEST_SCHEMA: Schema = SchemaBuilder.struct()    .name(HTTP_REQUEST_SCHEMA_NAME)    .field(FIELD_HTTP_METHOD, Schema.STRING_SCHEMA)    .field(        FIELD_HTTP_HEADERS,        SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()    )    .field(FIELD_HTTP_BODY, Schema.OPTIONAL_STRING_SCHEMA)    .field(FIELD_HTTP_URL, Schema.STRING_SCHEMA)    .build() <\/code><\/pre>\n<p>\u041d\u0430 \u0432\u0445\u043e\u0434 \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430 \u0434\u043e\u043b\u0436\u043d\u0430 \u043e\u0431\u044f\u0437\u0430\u0442\u0435\u043b\u044c\u043d\u043e \u043f\u043e\u0441\u0442\u0443\u043f\u0430\u0442\u044c Struct \u0441\u043e Schema = HTTP_REQUEST_SCHEMA, \u0442.\u0435. \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0438\u0437 Kafka \u0441 \u043f\u043e\u043c\u043e\u0449\u044c\u044e converter \u0438 transforms \u0434\u043e\u043b\u0436\u043d\u044b \u0431\u044b\u0442\u044c \u043f\u0440\u0438\u0432\u0435\u0434\u0435\u043d\u044b \u043a Struct \u0441\u043e \u0441\u0445\u0435\u043c\u043e\u0439 HTTP_REQUEST_SCHEMA.  <\/p>\n<p>\u0414\u043b\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430 \u0431\u0443\u0434\u0435\u043c \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u0441\u0442\u0430\u043d\u0434\u0430\u0440\u0442\u043d\u044b\u0439 java.net.http.HttpClient.  \u041a\u043b\u0430\u0441\u0441 \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u0438 \u0434\u043b\u044f \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430 \u0431\u0443\u0434\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0442\u0430\u043a:  <\/p>\n<pre><code class=\"kotlin\">class HttpSinkConfig(private val props: Map&lt;String, String&gt;) : AbstractConfig(configDef, props) {     companion object {        private const val RESPONSE_VALIDATOR_CLASS_NAME = \"response.validator\"        private const val CONNECTION_TIMEOUT_MS = \"connectionTimeoutMs\"         val configDef = ConfigDef()            .define(                RESPONSE_VALIDATOR_CLASS_NAME,                ConfigDef.Type.STRING,                HttpSuccessStatusResponseValidator::class.java.name,                ConfigDef.Importance.HIGH,                \"Class name of validator\"            )            .define(                CONNECTION_TIMEOUT_MS,                ConfigDef.Type.LONG,                2000,                ConfigDef.Importance.HIGH,                \"Http connection timeout in ms\"            )    }     fun responseValidator(): HttpResponseValidator =       (Class.forName(getString(RESPONSE_VALIDATOR_CLASS_NAME)).getDeclaredConstructor().newInstance() as HttpResponseValidator)            .apply { init(props) }       fun httpClient(): HttpClient =        HttpClient.newBuilder()            .connectTimeout(Duration.ofMillis(connectionTimeoutMs()))            .build()      private fun connectionTimeoutMs(): Long = getLong(CONNECTION_TIMEOUT_MS) }<\/code><\/pre>\n<p>\u0412 \u0434\u0430\u043d\u043d\u043e\u0439 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0441\u043e\u0437\u0434\u0430\u0435\u0442\u0441\u044f \u0434\u0435\u0444\u043e\u043b\u0442\u043d\u044b\u0439 HttpClient \u0441 \u043e\u0434\u043d\u043e\u0439 \u043b\u0438\u0448\u044c \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u043e\u0439, \u043d\u043e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u0438 \u043c\u043e\u0436\u043d\u043e \u0440\u0430\u0441\u0448\u0438\u0440\u0438\u0442\u044c, \u0434\u043e\u0431\u0430\u0432\u0438\u0432 ssl \u0441\u0432\u043e\u0439\u0441\u0442\u0432\u0430 \u0438 \u0434\u0440\u0443\u0433\u0438\u0435 \u0441\u043f\u0435\u0446\u0438\u0444\u0438\u0447\u043d\u044b\u0435 \u0441\u0432\u043e\u0439\u0441\u0442\u0432\u0430, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0434\u043e\u0431\u0430\u0432\u0438\u0442\u044c \u0434\u043b\u044f \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 HttpClient. \u0414\u043b\u044f \u044d\u0442\u043e\u0433\u043e \u043f\u043e \u0430\u043d\u0430\u043b\u043e\u0433\u0438\u0438 \u0441 connectionTimeoutMs \u043d\u0443\u0436\u043d\u043e \u043e\u0431\u044a\u044f\u0432\u0438\u0442\u044c \u0438\u0445 \u0432 ConfigDef.\u00a0<\/p>\n<p>responseValidator \u0431\u0443\u0434\u0435\u0442 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u0434\u043b\u044f \u0432\u0430\u043b\u0438\u0434\u0430\u0446\u0438\u0438 \u043e\u0442\u0432\u0435\u0442\u043e\u0432, \u0442.\u0435. \u0431\u0443\u0434\u0435\u0442 \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044c \u0437\u0430\u0434\u0430\u0442\u044c \u0432\u0430\u043b\u0438\u0434\u0430\u0442\u043e\u0440, \u043e\u043f\u0440\u0435\u0434\u0435\u043b\u044f\u0435\u0442 \u043a\u0430\u043a\u0438\u0435 \u043e\u0442\u0432\u0435\u0442\u044b \u0441\u0447\u0438\u0442\u0430\u0442\u044c \u0443\u0441\u043f\u0435\u0448\u043d\u044b\u043c\u0438, \u0430 \u043a\u0430\u043a\u0438\u0435 \u043e\u0448\u0438\u0431\u043e\u0447\u043d\u044b\u043c\u0438. \u041e\u043f\u0440\u0435\u0434\u0435\u043b\u0438\u043c \u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441 \u0434\u043b\u044f \u044d\u0442\u0438\u0445 \u0446\u0435\u043b\u0435\u0439:<\/p>\n<pre><code class=\"kotlin\">interface HttpResponseValidator : (HttpResponse&lt;String&gt;) -&gt; Unit {    @Throws(RetriableException::class)    override fun invoke(response: HttpResponse&lt;String&gt;)    fun init(props: Map&lt;String, String&gt;) }<\/code><\/pre>\n<p>\u041f\u0440\u0438\u0432\u0435\u0434\u0435\u043c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e, \u043a\u043e\u0442\u043e\u0440\u0430\u044f \u043a\u043b\u0430\u0441\u0441\u0438\u0444\u0438\u0446\u0438\u0440\u0443\u0435\u0442 \u043e\u0442\u0432\u0435\u0442\u044b \u043e\u0442\u043d\u043e\u0441\u0438\u0442\u0435\u043b\u044c\u043d\u043e http response code. \u0417\u0430\u0434\u0430\u0435\u0442\u0441\u044f 3 \u0442\u0438\u043f\u0430 http response code:<\/p>\n<ol>\n<li>\n<p>successCodes &#8212; \u0442\u0440\u0430\u043a\u0442\u0443\u0435\u043c \u043e\u0442\u0432\u0435\u0442, \u043a\u0430\u043a \u0443\u0441\u043f\u0435\u0448\u043d\u044b\u0439<\/p>\n<\/li>\n<li>\n<p>retryCodes &#8212; \u0442\u0440\u0430\u043a\u0442\u0443\u0435\u043c \u043e\u0442\u0432\u0435\u0442 \u043a\u0430\u043a \u0432\u0440\u0435\u043c\u0435\u043d\u043d\u043e \u043e\u0448\u0438\u0431\u043e\u0447\u043d\u044b\u0439, \u043f\u0440\u0438 \u043a\u043e\u0442\u043e\u0440\u043e\u043c \u0432\u044b\u043f\u043e\u043b\u043d\u044f\u0435\u0442\u0441\u044f \u043f\u0435\u0440\u0435\u043e\u0442\u043f\u0440\u0430\u0432\u043a\u0430<\/p>\n<\/li>\n<li>\n<p>errorCodes &#8212; \u0442\u0440\u0430\u043a\u0442\u0443\u0435\u043c \u043e\u0442\u0432\u0435\u0442 \u043a\u0430\u043a \u043e\u0448\u0438\u0431\u043e\u0447\u043d\u044b\u0439, \u0434\u0430\u043b\u044c\u043d\u0435\u0439\u0448\u0435\u0435 \u043f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 \u0437\u0430\u0432\u0438\u0441\u0438\u0442 \u043e\u0442 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0442\u0430\u0441\u043a\u0438 \u201cerror.tolerance\u201d: all &#8212; \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0430\u0435\u0442\u0441\u044f, none &#8212; \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0437\u0430\u0432\u0435\u0440\u0448\u0430\u0435\u0442\u0441\u044f<\/p>\n<\/li>\n<\/ol>\n<pre><code class=\"kotlin\">class StatusResponseValidator : HttpResponseValidator {    private lateinit var retryCodes: List&lt;Int&gt;    private lateinit var successCodes: List&lt;Int&gt;    private lateinit var errorCodes: List&lt;Int&gt;      override fun invoke(response: HttpResponse&lt;String&gt;) {        if (response.statusCode() !in successCodes) {            if (response.statusCode() in retryCodes || retryCodes.isEmpty() &amp;&amp; response.statusCode() !in errorCodes)                throw RetriableException(\"Status $response.statusCode() is not success $successCodes\")            else throw IllegalArgumentException(\"Status $response.statusCode() is not success $successCodes\")        }    }     override fun init(props: Map&lt;String, String&gt;) {        val config = AbstractConfig(            ConfigDef()                .define(                    RETRY_CODES,                    ConfigDef.Type.LIST,                    listOf&lt;String&gt;(),                    ConfigDef.Importance.LOW,                    \"Http response codes for retry\"                )                .define(                    SUCCESS_CODES,                    ConfigDef.Type.LIST,                    listOf(\"200\"),                    ConfigDef.Importance.HIGH,                    \"Success http response codes\"                )                .define(                    ERROR_CODES,                    ConfigDef.Type.LIST,                    listOf&lt;String&gt;(),                    ConfigDef.Importance.HIGH,                    \"Error http response codes\"                ),            props        )        retryCodes = config.getList(RETRY_CODES).map { it.toInt() }        successCodes = config.getList(SUCCESS_CODES).map { it.toInt() }        errorCodes = config.getList(ERROR_CODES).map { it.toInt() }    }     companion object {        private const val RETRY_CODES = \"response.validator.codes.retry\"        private const val SUCCESS_CODES = \"response.validator.codes.success\"        private const val ERROR_CODES = \"response.validator.codes.error\"    } } <\/code><\/pre>\n<p>\u041f\u0440\u0438\u043c\u0435\u0440 \u0442\u043e\u0433\u043e, \u0441\u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0438\u0440\u043e\u0432\u0430\u0442\u044c response validator \u0432 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0430\u0445 \u0442\u0430\u0441\u043a\u0438:  <\/p>\n<pre><code class=\"json\">{   .... \"response.validator\": \"ru.typik.kafka.connect.task.StatusResponseValidator\", \"response.validator.codes.success\": \"200\", \"response.validator.codes.error\": \"400\",   .... }<\/code><\/pre>\n<p>\u0421\u0430\u043c\u0430\u044f \u043f\u0440\u043e\u0441\u0442\u0430\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f \u0442\u0430\u0441\u043a\u0438 \u0441 HttpClient \u0431\u0443\u0434\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0442\u0430\u043a:<\/p>\n<pre><code class=\"kotlin\">class HttpSinkTask : SinkTask() {     companion object {        private val log = LoggerFactory.getLogger(this::class.java)    }     protected lateinit var config: HttpSinkConfig    protected lateinit var httpClient: HttpClient    protected lateinit var responseValidator: HttpResponseValidator     override fun version(): String = \"1.0\"    override fun stop() {}    override fun flush(currentOffsets: MutableMap&lt;TopicPartition, OffsetAndMetadata&gt;) { }     override fun start(props: Map&lt;String, String&gt;) {        log.info(\"Starting http sink task...\")        config = HttpSinkConfig(props)        httpClient = config.httpClient()        responseValidator = config.responseValidator()    }     override fun put(records: Collection&lt;SinkRecord&gt;) {        if (records.isEmpty()) return         log.debug(            \"Received {} records. First record kafka coordinates:({}-{}-{}).\",            records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()        )         records.forEach { record -&gt;                record.toHttpRequestSafe()                    ?.let { request -&gt;                        request.send()                            .also { response -&gt;                                log.trace(\"Http request: {}, Http response: {}\", request, response)                                response.validate(record)                            }                    }        }    }     private fun HttpRequest.send() = try {        httpClient.send(this, HttpResponse.BodyHandlers.ofString())    } catch (ex: Exception) {        \/\/ \u0421\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0431\u0443\u0434\u0443\u0442 \u043f\u0435\u0440\u0435\u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0430\u043d\u044b \u0447\u0435\u0440\u0435\u0437 backoffTimeoutMs        log.error(\"Error during sending http request: $this\", ex)        log.info(\"Context timeout before retry\")        context.timeout(config.backoffTimeoutMs())        log.info(\"Throw exception after context timeout\")        throw RetriableException(ex)    }     protected fun HttpResponse&lt;String&gt;.validate(record: SinkRecord) {        try {            responseValidator(this)        } catch (ex: RetriableException) {            \/\/ \u0421\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0431\u0443\u0434\u0443\u0442 \u043f\u0435\u0440\u0435\u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0430\u043d\u044b \u0447\u0435\u0440\u0435\u0437 backoffTimeoutMs            log.info(\"Context timeout before retry\")            context.timeout(config.backoffTimeoutMs())            log.info(\"Throw exception after timeout\")            throw ex        } catch (ex: Exception) {            log.error(\"Matching response failed\", ex)            \/\/ \u041f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 \u0437\u0430\u0432\u0438\u0441\u0438\u0442 \u043e\u0442 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0442\u0430\u0441\u043a\u0438 errors.tolerance:            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0430\u0435\u0442\u0441\u044f, \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"none\"            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0430\u0435\u0442\u0441\u044f, \u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044e\u0442\u0441\u044f \u0432 dead letter,            \/\/    \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"all\"             context.errantRecordReporter().report(record, ex)        }    }     protected fun SinkRecord.toHttpRequestSafe() = try {        toHttpRequest()    } catch (ex: Exception) {        log.error(\"Invalid record\", ex)        \/\/ \u041f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 \u0437\u0430\u0432\u0438\u0441\u0438\u0442 \u043e\u0442 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0442\u0430\u0441\u043a\u0438 errors.tolerance:            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0430\u0435\u0442\u0441\u044f, \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"none\"            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0430\u0435\u0442\u0441\u044f, \u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044e\u0442\u0441\u044f \u0432 dead letter,            \/\/    \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"all\"         context.errantRecordReporter().report(this, ex)        null    }     protected fun SinkRecord.toHttpRequest() =        (value() as Struct)            .let { httpStruct -&gt;                HttpRequest.newBuilder()                    .uri(URI.create(httpStruct.getHttpUrl()))                    .method(                        httpStruct.getHttpMethod(),                        httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()                    )                    .apply {                        httpStruct.getHttpHeaders()?.forEach { (k, v) -&gt; header(k, v) }                    }                    .build()            } } <\/code><\/pre>\n<p>  \u0420\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f \u043a\u043b\u0430\u0441\u0441\u0430 \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430:<\/p>\n<pre><code class=\"kotlin\">class HttpSinkConnector : SinkConnector() {     private val log = LoggerFactory.getLogger(HttpSinkConnector::class.java)    private lateinit var settings: Map&lt;String, String&gt;     override fun version(): String = \"1.0\"     override fun start(props: MutableMap&lt;String, String&gt;) {        log.info(\"Starting HttpSyncSinkConnector...\")        settings = props    }     override fun taskClass(): Class&lt;out Task&gt; = HabrHttpSinkTask::class.java    override fun taskConfigs(maxTasks: Int): List&lt;Map&lt;String, String&gt;&gt; =        List(maxTasks) { settings }    override fun stop() {}    override fun config(): ConfigDef = HttpSinkConfig.configDef     override fun validate(connectorConfigs: Map&lt;String, String&gt;): Config {        return super.validate(connectorConfigs)    } } <\/code><\/pre>\n<p> \u0411\u044b\u043b\u0430 \u043f\u0440\u0438\u0432\u0435\u0434\u0435\u043d\u0430 \u043f\u0440\u043e\u0441\u0442\u0430\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430, \u0433\u0434\u0435 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0431\u0443\u0434\u0443\u0442 \u043e\u0431\u0440\u0430\u0431\u0430\u0442\u044b\u0432\u0430\u0442\u044c\u0441\u044f \u0441\u0442\u0440\u043e\u0433\u043e \u043e\u0434\u043d\u043e \u0437\u0430 \u043e\u0434\u043d\u0438\u043c. \u041f\u043e\u043f\u0440\u043e\u0431\u0443\u0435\u043c \u043d\u0435\u043c\u043d\u043e\u0433\u043e \u043e\u043f\u0442\u0438\u043c\u0438\u0437\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0434\u0430\u043d\u043d\u0443\u044e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0437\u0430 \u0441\u0447\u0435\u0442 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u044f \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u043e\u0433\u043e \u043c\u0435\u0442\u043e\u0434\u0430 <code>HttpClient.sendAsync<\/code> \u0432\u043c\u0435\u0441\u0442\u043e \u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u043e\u0433\u043e <code>HttpClient.send<\/code>. \u0418\u0434\u0435\u044f \u0432 \u0442\u043e\u043c, \u0447\u0442\u043e\u0431\u044b \u043f\u043e\u0441\u044b\u043b\u0430\u0442\u044c \u043d\u0435\u0441\u043a\u043e\u043b\u044c\u043a\u043e \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u043f\u0430\u0440\u0430\u043b\u043b\u0435\u043b\u044c\u043d\u043e, \u043e\u0431\u0440\u0430\u0431\u0430\u0442\u044b\u0432\u0430\u044f \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u043f\u0430\u0447\u043a\u0430\u043c\u0438. \u0422\u0430\u043a\u043e\u0439 \u043f\u043e\u0434\u0445\u043e\u0434 \u0432 \u043d\u0435\u043a\u043e\u0442\u043e\u0440\u044b\u0445 \u0441\u043b\u0443\u0447\u0430\u044f\u0445 \u043c\u043e\u0436\u0435\u0442 \u0431\u044b\u0442\u044c \u0431\u043e\u043b\u0435\u0435 \u043e\u043f\u0442\u0438\u043c\u0430\u043b\u044c\u043d\u044b\u043c, \u0435\u0441\u043b\u0438 \u044d\u0442\u043e \u043f\u0440\u0435\u0434\u0443\u0441\u043c\u043e\u0442\u0440\u0435\u043d\u043e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0435\u0439 Http Server.<\/p>\n<p>\u0427\u0442\u043e\u0431\u044b \u0438\u043c\u0435\u0442\u044c \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044c \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u043c\u0430\u043a\u0441\u0438\u043c\u0430\u043b\u044c\u043d\u043e\u0435 \u043a\u043e\u043b\u0438\u0447\u0435\u0441\u0442\u0432\u043e \u043f\u0430\u0440\u0430\u043b\u043b\u0435\u043b\u044c\u043d\u044b\u0445 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u043f\u0440\u0438\u0432\u0435\u0434\u0435\u043c \u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441 \u0438 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e <code>SinkRecordGrouper<\/code>, \u043a\u043e\u0442\u043e\u0440\u044b\u0439 \u0434\u0435\u043b\u0438\u0442 \u0441\u043f\u0438\u0441\u043e\u043a \u0432\u0445\u043e\u0434\u044f\u0449\u0438\u0445 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u043d\u0430 \u0432\u0445\u043e\u0434\u0435 \u043c\u0435\u0442\u043e\u0434\u0430 <code>SinkTask.put<\/code> \u043d\u0430 \u043f\u043e\u0434\u0441\u043f\u0438\u0441\u043a\u0438, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0432 \u0441\u0432\u043e\u044e \u043e\u0447\u0435\u0440\u0435\u0434\u044c \u043e\u0431\u0440\u0430\u0431\u0430\u0442\u044b\u0432\u0430\u044e\u0442\u0441\u044f \u0441\u0442\u0440\u043e\u0433\u043e \u043f\u043e\u0441\u043b\u0435\u0434\u043e\u0432\u0430\u0442\u0435\u043b\u044c\u043d\u043e, \u043f\u0440\u0438 \u044d\u0442\u043e\u043c \u044d\u043b\u0435\u043c\u0435\u043d\u0442\u044b \u043f\u043e\u0434\u0441\u043f\u0438\u0441\u043a\u043e\u0432 \u043e\u0431\u0440\u0430\u0431\u0430\u0442\u044b\u0432\u0430\u044e\u0442\u0441\u044f \u043f\u0430\u0440\u0430\u043b\u043b\u0435\u043b\u044c\u043d\u043e:<\/p>\n<pre><code class=\"kotlin\">interface SinkRecordGrouper : (List&lt;SinkRecord&gt;) -&gt; List&lt;List&lt;SinkRecord&gt;&gt; {    override fun invoke(records: List&lt;SinkRecord&gt;): List&lt;List&lt;SinkRecord&gt;&gt;    fun init(props: Map&lt;String, String&gt;) }<\/code><\/pre>\n<p>\u041f\u0440\u0438\u0432\u0435\u0434\u0435\u043c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e, \u043a\u043e\u0442\u043e\u0440\u0430\u044f \u043a\u0440\u043e\u043c\u0435 \u0441\u043e\u0431\u0441\u0442\u0432\u0435\u043d\u043d\u043e \u0440\u0430\u0437\u0431\u0438\u0432\u043a\u0438 \u043d\u0430 \u043f\u043e\u0434\u0441\u043f\u0438\u0441\u043a\u0438 \u043f\u043e \u043a\u043e\u043b\u0438\u0447\u0435\u0441\u0442\u0432\u0443 \u044d\u043b\u0435\u043c\u0435\u043d\u0442\u043e\u0432, \u0435\u0449\u0435 \u0438 \u0433\u0440\u0443\u043f\u043f\u0438\u0440\u0443\u0435\u0442 \u0438\u0445 \u043f\u043e \u043a\u043b\u044e\u0447\u0443, \u0442.\u0435. \u044d\u043b\u0435\u043c\u0435\u043d\u0442\u044b \u0441 \u043e\u0434\u0438\u043d\u0430\u043a\u043e\u0432\u044b\u043c\u0438 \u043a\u043b\u044e\u0447\u0430\u043c\u0438 \u043f\u043e\u043c\u0435\u0449\u0430\u044e\u0442\u0441\u044f \u0432 \u0440\u0430\u0437\u043d\u044b\u0435 \u043f\u043e\u0434\u0441\u043f\u0438\u0441\u043a\u0438, \u0447\u0442\u043e\u0431\u044b \u043e\u043d\u0438 \u043d\u0435 \u043e\u0431\u0440\u0430\u0431\u0430\u0442\u044b\u0432\u0430\u043b\u0438\u0441\u044c \u043f\u0430\u0440\u0430\u043b\u043b\u0435\u043b\u044c\u043d\u043e, \u0430 \u0441\u0442\u0440\u043e\u0433\u043e \u043f\u043e\u0441\u043b\u0435\u0434\u043e\u0432\u0430\u0442\u0435\u043b\u044c\u043d\u043e, \u0432 \u043d\u0435\u043a\u043e\u0442\u043e\u0440\u044b\u0445 \u0441\u043b\u0443\u0447\u0430\u044f\u0445 \u0442\u0430\u043a\u0430\u044f \u043b\u043e\u0433\u0438\u043a\u0430 \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u0430. <\/p>\n<pre><code class=\"kotlin\">class KeyGrouper : SinkRecordGrouper {     private var parallelCount by Delegates.notNull&lt;Long&gt;()     override fun invoke(records: List&lt;SinkRecord&gt;): List&lt;List&lt;SinkRecord&gt;&gt; {        val result = mutableListOf&lt;List&lt;SinkRecord&gt;&gt;()        val batch = mutableListOf(*records.toTypedArray())         while (batch.isNotEmpty()) {            val keySet = mutableSetOf&lt;Any&gt;()            val subResult = mutableListOf&lt;SinkRecord&gt;()            for (r in batch) {                if (r.key() !in keySet) {                    subResult.add(r)                    keySet.add(r.key())                }                 if (subResult.size &gt;= parallelCount)                    break            }            batch.removeAll(subResult)            result.add(subResult)        }        return result    }      override fun init(props: Map&lt;String, String&gt;) {        val config = AbstractConfig(            ConfigDef()                .define(                    PARALLEL_COUNT,                    ConfigDef.Type.LONG,                    5,                    ConfigDef.Importance.LOW,                    \"How many requests to send in parallel\"                ),            props        )        parallelCount = config.getLong(PARALLEL_COUNT)    }     companion object {        private const val PARALLEL_COUNT = \"grouper.parallelCount\"    } } <\/code><\/pre>\n<p>\u041f\u0440\u0438\u0432\u0435\u0434\u0435\u043c \u043e\u0431\u043d\u043e\u0432\u043b\u0435\u043d\u043d\u0443\u044e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e <code>HttpSinkTask<\/code> \u0441 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435\u043c <code>HttpClient.sendAsync<\/code> \u0438 <code>KeyGrouper<\/code>:  <\/p>\n<pre><code class=\"kotlin\">class HttpSinkTask : SinkTask() {     companion object {        private val log = LoggerFactory.getLogger(this::class.java)    }     private lateinit var config: HttpSinkConfig    private lateinit var httpClient: HttpClient    private lateinit var responseValidator: HttpResponseValidator    private lateinit var grouper: SinkRecordGrouper       override fun version(): String = \"1.0\"    override fun stop() {}    override fun flush(currentOffsets: MutableMap&lt;TopicPartition, OffsetAndMetadata&gt;) {    }     override fun start(props: Map&lt;String, String&gt;) {        log.info(\"Starting http sink task...\")        config = HttpSinkConfig(props)        httpClient = config.httpClient()        responseValidator = config.responseValidator()        grouper = KeyGrouper().apply { init(props) }    }      override fun put(records: Collection&lt;SinkRecord&gt;) {        if (records.isEmpty()) return          log.debug(            \"Received {} records. First record kafka coordinates:({}-{}-{}).\",            records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()        )               grouper(records.toList()).forEach { subRecords -&gt;            subRecords.map { record -&gt;                record.toHttpRequestSafe()                    ?.let { request -&gt;                        request.sendAsync()                            ?.exceptionally { ex -&gt;                                \/\/ \u0411\u0430\u0442\u0447 \u0431\u0443\u0434\u0435\u0442 \u043f\u0435\u0440\u0435\u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0430\u043d \u0447\u0435\u0440\u0435\u0437 backoffTimeoutMs                                log.error(\"Error handling response, http request: $this\", ex)                                log.info(\"Context timeout before retry\")                                context.timeout(config.backoffTimeoutMs())                                log.info(\"Throw exception after context timeout\")                                throw RetriableException(ex)                              }                            ?.thenApply { response -&gt; response.validate(record) }                    }            }                .forEach {                    try {                        it?.join()                    } catch (ex: CompletionException) {                        ex.cause?.let { throw it }                    }                }        }    }      private fun HttpRequest.sendAsync() = try {        httpClient.sendAsync(this, HttpResponse.BodyHandlers.ofString())    } catch (ex: Exception) {        \/\/ \u0411\u0430\u0442\u0447 \u0431\u0443\u0434\u0435\u0442 \u043f\u0435\u0440\u0435\u0440\u0430\u0431\u043e\u0442\u0430 \u043f\u043e\u0441\u043b\u0435 backoffTimeoutMs        log.error(\"Error during sending http request: $this\", ex)        log.info(\"Context timeout before retry\")        context.timeout(config.backoffTimeoutMs())        log.info(\"Throw exception after context timeout\")        throw RetriableException(ex)    }     protected fun HttpResponse&lt;String&gt;.validate(record: SinkRecord) {        try {            responseValidator(this)        } catch (ex: RetriableException) {            \/\/ \u0421\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0431\u0443\u0434\u0443\u0442 \u043f\u0435\u0440\u0435\u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0430\u043d\u044b \u0447\u0435\u0440\u0435\u0437 backoffTimeoutMs            log.info(\"Context timeout before retry\")            context.timeout(config.backoffTimeoutMs())            log.info(\"Throw exception after timeout\")            throw ex        } catch (ex: Exception) {            log.error(\"Matching response failed\", ex)            \/\/ \u041f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 \u0437\u0430\u0432\u0438\u0441\u0438\u0442 \u043e\u0442 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0442\u0430\u0441\u043a\u0438 errors.tolerance:            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0430\u0435\u0442\u0441\u044f, \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"none\"            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0430\u0435\u0442\u0441\u044f, \u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044e\u0442\u0441\u044f \u0432 dead letter,            \/\/    \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"all\" )            context.errantRecordReporter().report(record, ex)        }    }     protected fun SinkRecord.toHttpRequestSafe() = try {        toHttpRequest()    } catch (ex: Exception) {        log.error(\"Invalid record\", ex)        \/\/ report a record processing error to the context        \/\/ depending on the error handling strategy settings:        \/\/ * processing is stopped ( \"errors.tolerance\": \"none\" )        \/\/ * processing is continued and this record is sent to the dead letter ( \"errors.tolerance\": \"all\" )        context.errantRecordReporter().report(this, ex)        null    }     protected fun SinkRecord.toHttpRequest() =        (value() as Struct)            .let { httpStruct -&gt;                HttpRequest.newBuilder()                    .uri(URI.create(httpStruct.getHttpUrl()))                    .method(                        httpStruct.getHttpMethod(),                        httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()                    )                    .apply {                        httpStruct.getHttpHeaders()?.forEach { (k, v) -&gt; header(k, v) }                    }                    .build()            } } <\/code><\/pre>\n<p>\u041a\u0430\u043a \u043c\u043e\u0436\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u044f \u0442\u0430\u0441\u043a\u0438:<\/p>\n<pre><code class=\"json\">{  \"connector.class\": \"ru.typik.kafka.connect.HttpAsyncSinkConnector\",  \"key.converter\": \"org.apache.kafka.connect.storage.StringConverter\",  \"value.converter\": \"ru.typik.kafka.connect.converter.ProtobufConverter\",  \"value.converter.protoClassName\": \"ru.typik.debt.proto.NotificationModel$NotificationData\",  \"consumer.override.group.id\": \"${tpp:consumer-group}\",  \"auto.create\": \"false\",  \"tasks.max\": \"1\",  \"topics\": \"${tpp:topic}\",  \"errors.tolerance\": \"all\",  \"errors.log.enable\": true,  \"errors.log.include.messages\": true,  \"errors.deadletterqueue.topic.name\": \"${tpp:deadLetter}\",  \"errors.deadletterqueue.topic.replication.factor\": \"${tpp:replication-factor}\",  \"errors.deadletterqueue.context.headers.enable\": true,  \"transforms\": \"http\",  \"transforms.http.type\": \"ru.typik.HttpTransform\",  \"grouper.parallelCount\": \"50\",  \"backoffTimeoutMs\": \"${tpp:backoffTimeoutMs}\",  \"response.validator\": \"ru.typik.kafka.connect.task.StatusResponseValidator\",  \"response.validator.codes.success\": \"200\",  \"response.validator.codes.error\": \"400\" } <\/code><\/pre>\n<p>\u041a\u0430\u043a \u043c\u043e\u0436\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f <code>HttpTransform<\/code>:  <\/p>\n<pre><code class=\"kotlin\">class HttpTransform&lt;R : ConnectRecord&lt;R&gt;&gt; : Transformation&lt;R&gt; {     override fun close() {}     protected fun Struct.getMethod(): String = \u201cGET\u201d    protected fun Struct.getUrl(): String = \u201chttp:\/\/localhost:8080\u201d    protected fun Struct.getBody(): String? = \u201c{}\u201d    protected fun Struct.getHeaders(): Map&lt;String, String&gt;? = mapOf(        \"Content-Type\" to \"application\/json\",        \"Accept\" to \"application\/json\"    )     override fun apply(record: R): R =        record.newRecord(            record.topic(),            record.kafkaPartition(),            record.keySchema(),            record.key(),            HTTP_REQUEST_SCHEMA,            (record.value() as Struct).let { struct -&gt;                Struct(HTTP_REQUEST_SCHEMA)                    .put(FIELD_HTTP_METHOD, struct.getMethod())                    .put(FIELD_HTTP_URL, struct.getUrl())                    .put(FIELD_HTTP_BODY, struct.getBody())                    .put(FIELD_HTTP_HEADERS, struct.getHeaders())            },            record.timestamp()        ) }<\/code><\/pre>\n<p>\u042f \u043f\u043e\u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043b \u0434\u0430\u043d\u043d\u0443\u044e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430 \u0441 \u0440\u0430\u0437\u043b\u0438\u0447\u043d\u044b\u043c\u0438 \u0437\u043d\u0430\u0447\u0435\u043d\u0438\u044f\u043c\u0438 <code>parallelCount<\/code> \u0438 \u043f\u043e\u0441\u0442\u0440\u043e\u0438\u043b \u0433\u0440\u0430\u0444\u0438\u043a\u0438 \u0432 Graphana \u0441 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435\u043c \u0441\u0442\u0430\u043d\u0434\u0430\u0440\u0442\u043d\u044b\u0445 \u043c\u0435\u0442\u0440\u0438\u043a Kafka Connect. \u0412 \u043a\u0430\u0447\u0435\u0441\u0442\u0432\u0435 Http Server \u0432\u0437\u044f\u043b wire mock, \u043a\u043e\u0442\u043e\u0440\u044b\u0439 \u043e\u0442\u0432\u0435\u0447\u0430\u0435\u0442 \u0441 \u043d\u0435\u0431\u043e\u043b\u044c\u0448\u043e\u0439 \u0437\u0430\u0434\u0435\u0440\u0436\u043a\u043e\u0439.\u00a0<\/p>\n<p>\u041a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u044f wiremock:<\/p>\n<pre><code class=\"json\">{  \"mappings\": [    {      \"priority\": 1,      \"request\": {        \"method\": \"GET\",        \"urlPattern\": \"\/\"      },      \"response\": {        \"status\": 200,        \"fixedDelayMilliseconds\": 200,        \"headers\": {          \"Content-Type\": \"application\/json\"        }      }    }  ] }<\/code><\/pre>\n<p>\u041f\u043e\u043b\u0443\u0447\u0438\u0432\u0448\u0438\u0435\u0441\u044f \u0433\u0440\u0430\u0444\u0438\u043a\u0438:<\/p>\n<figure class=\"full-width\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w1560\/getpro\/habr\/upload_files\/3ce\/7d0\/52b\/3ce7d052b041a13a0bf63f90455c4e74.png\" alt=\"\u041d\u0430\u0433\u0440\u0443\u0437\u043e\u0447\u043d\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435\" title=\"\u041d\u0430\u0433\u0440\u0443\u0437\u043e\u0447\u043d\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435\" width=\"1600\" height=\"301\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/upload_files\/3ce\/7d0\/52b\/3ce7d052b041a13a0bf63f90455c4e74.png\"\/><\/p>\n<div><figcaption>\u041d\u0430\u0433\u0440\u0443\u0437\u043e\u0447\u043d\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435<\/figcaption><\/div>\n<\/figure>\n<p>\u042d\u0442\u043e \u0434\u043e\u0441\u0442\u0430\u0442\u043e\u0447\u043d\u043e \u0443\u0441\u043b\u043e\u0432\u043d\u044b\u0439 \u0441\u0446\u0435\u043d\u0430\u0440\u0438\u0439 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f, \u0432\u0441\u0435 \u0437\u0430\u0432\u0438\u0441\u0438\u0442 \u043e\u0442 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 Http Server, \u0431\u0443\u0434\u0435\u0442 \u043b\u0438 \u0432\u044b\u0445\u043b\u043e\u043f \u043e\u0442 \u043e\u043f\u0442\u0438\u043c\u0438\u0437\u0430\u0446\u0438\u0438  \u0441 \u043f\u0430\u0440\u0430\u043b\u043b\u0435\u043b\u044c\u043d\u044b\u043c\u0438 \u0437\u0430\u043f\u0440\u043e\u0441\u0430\u043c\u0438. \u0412\u043e\u0437\u043c\u043e\u0436\u043d\u043e \u0434\u043b\u044f \u043a\u043e\u0433\u043e-\u0442\u043e \u043f\u043e\u0434\u043e\u0439\u0434\u0435\u0442 \u0432\u0430\u0440\u0438\u0430\u043d\u0442 \u0441 \u043e\u043f\u0442\u0438\u043c\u0438\u0437\u0430\u0446\u0438\u0435\u0439, \u043a\u043e\u0442\u043e\u0440\u0430\u044f \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u0435\u0442 \u0433\u0440\u0443\u043f\u043f\u0438\u0440\u043e\u0432\u0430\u0442\u044c http \u0437\u0430\u043f\u0440\u043e\u0441\u044b \u0432 \u0431\u0430\u0442\u0447\u0438 \u0438 \u0441\u043b\u0430\u0442\u044c \u0438\u0445 \u043e\u0434\u043d\u0438\u043c \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u043c, \u043d\u043e \u0432 \u044d\u0442\u043e\u043c \u0441\u043b\u0443\u0447\u0430\u0435 Http Server \u0442\u043e\u0436\u0435 \u0434\u043e\u043b\u0436\u0435\u043d \u0431\u044b\u0442\u044c \u0433\u043e\u0442\u043e\u0432 \u043a \u0442\u0430\u043a\u043e\u0439 \u0441\u043f\u0435\u0446\u0438\u0444\u0438\u0447\u043d\u043e\u0439 \u043b\u043e\u0433\u0438\u043a\u0435 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438.  <\/p>\n<\/div>\n<\/div>\n<\/div>\n<p><!----><!----><\/div>\n<p><!----><!----><br \/> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/articles\/851916\/\"> https:\/\/habr.com\/ru\/articles\/851916\/<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<div><!--[--><!--]--><\/div>\n<div id=\"post-content-body\">\n<div>\n<div class=\"article-formatted-body article-formatted-body article-formatted-body_version-2\">\n<div xmlns=\"http:\/\/www.w3.org\/1999\/xhtml\">\n<p>\u0412 \u0434\u0430\u043d\u043d\u043e\u0439 \u0441\u0442\u0430\u0442\u044c\u0435 \u043f\u0440\u0438\u0432\u0435\u0434\u0443 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0441\u0432\u043e\u0435\u0433\u043e kafka http sink connector. \u041e\u043d \u043d\u0435 \u043f\u0440\u0435\u0442\u0435\u043d\u0434\u0443\u0435\u0442 \u043d\u0430 \u0443\u043d\u0438\u0432\u0435\u0440\u0441\u0430\u043b\u044c\u043d\u043e\u0441\u0442\u044c, \u043d\u043e \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e \u043f\u043e\u043c\u043e\u0436\u0435\u0442 \u0440\u0430\u0437\u043e\u0431\u0440\u0430\u0442\u044c\u0441\u044f \u043a\u0430\u043a \u0440\u0430\u0437\u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c \u0441\u0432\u043e\u0439 connector.<\/p>\n<p>Confluent Http Sink Connector &#8212; \u043f\u043b\u0430\u0442\u043d\u044b\u0439, \u0434\u0440\u0443\u0433\u0438\u0435 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u044b \u0441 github \u043c\u043d\u0435 \u043d\u0435 \u043f\u043e\u0434\u043e\u0448\u043b\u0438. \u041f\u0440\u043e Kafka Connect \u043c\u043e\u0436\u043d\u043e \u043f\u043e\u0447\u0438\u0442\u0430\u0442\u044c <a href=\"https:\/\/docs.confluent.io\/platform\/current\/connect\/userguide.html\" rel=\"noopener noreferrer nofollow\">\u0437\u0434\u0435\u0441\u044c<\/a>.\u00a0\u0421\u0442\u0430\u0442\u044c\u044f \u043f\u0440\u0435\u0434\u043f\u043e\u043b\u0430\u0433\u0430\u0435\u0442, \u0447\u0442\u043e \u0435\u0441\u0442\u044c \u043f\u043e\u043d\u0438\u043c\u0430\u043d\u0438\u0435 \u0442\u043e\u0433\u043e \u0437\u0430\u0447\u0435\u043c \u043d\u0443\u0436\u0435\u043d Kafka Connect Framework \u0438 \u043a\u0430\u043a \u0435\u0433\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c. \u041f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u0435\u043d\u043d\u044b\u0439 \u043a\u043e\u0434 \u043d\u0430\u043f\u0438\u0441\u0430\u043d \u043d\u0430 Kotlin.<\/p>\n<p>\u0414\u043b\u044f \u043d\u0430\u0447\u0430\u043b\u0430 \u0437\u0430\u0434\u0430\u0434\u0438\u043c Schema \u0434\u043b\u044f \u043d\u0430\u0448\u0435\u0433\u043e \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430:  <\/p>\n<pre><code class=\"kotlin\">val HTTP_REQUEST_SCHEMA: Schema = SchemaBuilder.struct()    .name(HTTP_REQUEST_SCHEMA_NAME)    .field(FIELD_HTTP_METHOD, Schema.STRING_SCHEMA)    .field(        FIELD_HTTP_HEADERS,        SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()    )    .field(FIELD_HTTP_BODY, Schema.OPTIONAL_STRING_SCHEMA)    .field(FIELD_HTTP_URL, Schema.STRING_SCHEMA)    .build() <\/code><\/pre>\n<p>\u041d\u0430 \u0432\u0445\u043e\u0434 \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430 \u0434\u043e\u043b\u0436\u043d\u0430 \u043e\u0431\u044f\u0437\u0430\u0442\u0435\u043b\u044c\u043d\u043e \u043f\u043e\u0441\u0442\u0443\u043f\u0430\u0442\u044c Struct \u0441\u043e Schema = HTTP_REQUEST_SCHEMA, \u0442.\u0435. \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0438\u0437 Kafka \u0441 \u043f\u043e\u043c\u043e\u0449\u044c\u044e converter \u0438 transforms \u0434\u043e\u043b\u0436\u043d\u044b \u0431\u044b\u0442\u044c \u043f\u0440\u0438\u0432\u0435\u0434\u0435\u043d\u044b \u043a Struct \u0441\u043e \u0441\u0445\u0435\u043c\u043e\u0439 HTTP_REQUEST_SCHEMA.  <\/p>\n<p>\u0414\u043b\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430 \u0431\u0443\u0434\u0435\u043c \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u0441\u0442\u0430\u043d\u0434\u0430\u0440\u0442\u043d\u044b\u0439 java.net.http.HttpClient.  \u041a\u043b\u0430\u0441\u0441 \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u0438 \u0434\u043b\u044f \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430 \u0431\u0443\u0434\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0442\u0430\u043a:  <\/p>\n<pre><code class=\"kotlin\">class HttpSinkConfig(private val props: Map&lt;String, String&gt;) : AbstractConfig(configDef, props) {     companion object {        private const val RESPONSE_VALIDATOR_CLASS_NAME = \"response.validator\"        private const val CONNECTION_TIMEOUT_MS = \"connectionTimeoutMs\"         val configDef = ConfigDef()            .define(                RESPONSE_VALIDATOR_CLASS_NAME,                ConfigDef.Type.STRING,                HttpSuccessStatusResponseValidator::class.java.name,                ConfigDef.Importance.HIGH,                \"Class name of validator\"            )            .define(                CONNECTION_TIMEOUT_MS,                ConfigDef.Type.LONG,                2000,                ConfigDef.Importance.HIGH,                \"Http connection timeout in ms\"            )    }     fun responseValidator(): HttpResponseValidator =       (Class.forName(getString(RESPONSE_VALIDATOR_CLASS_NAME)).getDeclaredConstructor().newInstance() as HttpResponseValidator)            .apply { init(props) }       fun httpClient(): HttpClient =        HttpClient.newBuilder()            .connectTimeout(Duration.ofMillis(connectionTimeoutMs()))            .build()      private fun connectionTimeoutMs(): Long = getLong(CONNECTION_TIMEOUT_MS) }<\/code><\/pre>\n<p>\u0412 \u0434\u0430\u043d\u043d\u043e\u0439 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0441\u043e\u0437\u0434\u0430\u0435\u0442\u0441\u044f \u0434\u0435\u0444\u043e\u043b\u0442\u043d\u044b\u0439 HttpClient \u0441 \u043e\u0434\u043d\u043e\u0439 \u043b\u0438\u0448\u044c \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u043e\u0439, \u043d\u043e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u0438 \u043c\u043e\u0436\u043d\u043e \u0440\u0430\u0441\u0448\u0438\u0440\u0438\u0442\u044c, \u0434\u043e\u0431\u0430\u0432\u0438\u0432 ssl \u0441\u0432\u043e\u0439\u0441\u0442\u0432\u0430 \u0438 \u0434\u0440\u0443\u0433\u0438\u0435 \u0441\u043f\u0435\u0446\u0438\u0444\u0438\u0447\u043d\u044b\u0435 \u0441\u0432\u043e\u0439\u0441\u0442\u0432\u0430, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0434\u043e\u0431\u0430\u0432\u0438\u0442\u044c \u0434\u043b\u044f \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 HttpClient. \u0414\u043b\u044f \u044d\u0442\u043e\u0433\u043e \u043f\u043e \u0430\u043d\u0430\u043b\u043e\u0433\u0438\u0438 \u0441 connectionTimeoutMs \u043d\u0443\u0436\u043d\u043e \u043e\u0431\u044a\u044f\u0432\u0438\u0442\u044c \u0438\u0445 \u0432 ConfigDef.\u00a0<\/p>\n<p>responseValidator \u0431\u0443\u0434\u0435\u0442 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u0434\u043b\u044f \u0432\u0430\u043b\u0438\u0434\u0430\u0446\u0438\u0438 \u043e\u0442\u0432\u0435\u0442\u043e\u0432, \u0442.\u0435. \u0431\u0443\u0434\u0435\u0442 \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044c \u0437\u0430\u0434\u0430\u0442\u044c \u0432\u0430\u043b\u0438\u0434\u0430\u0442\u043e\u0440, \u043e\u043f\u0440\u0435\u0434\u0435\u043b\u044f\u0435\u0442 \u043a\u0430\u043a\u0438\u0435 \u043e\u0442\u0432\u0435\u0442\u044b \u0441\u0447\u0438\u0442\u0430\u0442\u044c \u0443\u0441\u043f\u0435\u0448\u043d\u044b\u043c\u0438, \u0430 \u043a\u0430\u043a\u0438\u0435 \u043e\u0448\u0438\u0431\u043e\u0447\u043d\u044b\u043c\u0438. \u041e\u043f\u0440\u0435\u0434\u0435\u043b\u0438\u043c \u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441 \u0434\u043b\u044f \u044d\u0442\u0438\u0445 \u0446\u0435\u043b\u0435\u0439:<\/p>\n<pre><code class=\"kotlin\">interface HttpResponseValidator : (HttpResponse&lt;String&gt;) -&gt; Unit {    @Throws(RetriableException::class)    override fun invoke(response: HttpResponse&lt;String&gt;)    fun init(props: Map&lt;String, String&gt;) }<\/code><\/pre>\n<p>\u041f\u0440\u0438\u0432\u0435\u0434\u0435\u043c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e, \u043a\u043e\u0442\u043e\u0440\u0430\u044f \u043a\u043b\u0430\u0441\u0441\u0438\u0444\u0438\u0446\u0438\u0440\u0443\u0435\u0442 \u043e\u0442\u0432\u0435\u0442\u044b \u043e\u0442\u043d\u043e\u0441\u0438\u0442\u0435\u043b\u044c\u043d\u043e http response code. \u0417\u0430\u0434\u0430\u0435\u0442\u0441\u044f 3 \u0442\u0438\u043f\u0430 http response code:<\/p>\n<ol>\n<li>\n<p>successCodes &#8212; \u0442\u0440\u0430\u043a\u0442\u0443\u0435\u043c \u043e\u0442\u0432\u0435\u0442, \u043a\u0430\u043a \u0443\u0441\u043f\u0435\u0448\u043d\u044b\u0439<\/p>\n<\/li>\n<li>\n<p>retryCodes &#8212; \u0442\u0440\u0430\u043a\u0442\u0443\u0435\u043c \u043e\u0442\u0432\u0435\u0442 \u043a\u0430\u043a \u0432\u0440\u0435\u043c\u0435\u043d\u043d\u043e \u043e\u0448\u0438\u0431\u043e\u0447\u043d\u044b\u0439, \u043f\u0440\u0438 \u043a\u043e\u0442\u043e\u0440\u043e\u043c \u0432\u044b\u043f\u043e\u043b\u043d\u044f\u0435\u0442\u0441\u044f \u043f\u0435\u0440\u0435\u043e\u0442\u043f\u0440\u0430\u0432\u043a\u0430<\/p>\n<\/li>\n<li>\n<p>errorCodes &#8212; \u0442\u0440\u0430\u043a\u0442\u0443\u0435\u043c \u043e\u0442\u0432\u0435\u0442 \u043a\u0430\u043a \u043e\u0448\u0438\u0431\u043e\u0447\u043d\u044b\u0439, \u0434\u0430\u043b\u044c\u043d\u0435\u0439\u0448\u0435\u0435 \u043f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 \u0437\u0430\u0432\u0438\u0441\u0438\u0442 \u043e\u0442 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0442\u0430\u0441\u043a\u0438 \u201cerror.tolerance\u201d: all &#8212; \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0430\u0435\u0442\u0441\u044f, none &#8212; \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0437\u0430\u0432\u0435\u0440\u0448\u0430\u0435\u0442\u0441\u044f<\/p>\n<\/li>\n<\/ol>\n<pre><code class=\"kotlin\">class StatusResponseValidator : HttpResponseValidator {    private lateinit var retryCodes: List&lt;Int&gt;    private lateinit var successCodes: List&lt;Int&gt;    private lateinit var errorCodes: List&lt;Int&gt;      override fun invoke(response: HttpResponse&lt;String&gt;) {        if (response.statusCode() !in successCodes) {            if (response.statusCode() in retryCodes || retryCodes.isEmpty() &amp;&amp; response.statusCode() !in errorCodes)                throw RetriableException(\"Status $response.statusCode() is not success $successCodes\")            else throw IllegalArgumentException(\"Status $response.statusCode() is not success $successCodes\")        }    }     override fun init(props: Map&lt;String, String&gt;) {        val config = AbstractConfig(            ConfigDef()                .define(                    RETRY_CODES,                    ConfigDef.Type.LIST,                    listOf&lt;String&gt;(),                    ConfigDef.Importance.LOW,                    \"Http response codes for retry\"                )                .define(                    SUCCESS_CODES,                    ConfigDef.Type.LIST,                    listOf(\"200\"),                    ConfigDef.Importance.HIGH,                    \"Success http response codes\"                )                .define(                    ERROR_CODES,                    ConfigDef.Type.LIST,                    listOf&lt;String&gt;(),                    ConfigDef.Importance.HIGH,                    \"Error http response codes\"                ),            props        )        retryCodes = config.getList(RETRY_CODES).map { it.toInt() }        successCodes = config.getList(SUCCESS_CODES).map { it.toInt() }        errorCodes = config.getList(ERROR_CODES).map { it.toInt() }    }     companion object {        private const val RETRY_CODES = \"response.validator.codes.retry\"        private const val SUCCESS_CODES = \"response.validator.codes.success\"        private const val ERROR_CODES = \"response.validator.codes.error\"    } } <\/code><\/pre>\n<p>\u041f\u0440\u0438\u043c\u0435\u0440 \u0442\u043e\u0433\u043e, \u0441\u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0438\u0440\u043e\u0432\u0430\u0442\u044c response validator \u0432 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0430\u0445 \u0442\u0430\u0441\u043a\u0438:  <\/p>\n<pre><code class=\"json\">{   .... \"response.validator\": \"ru.typik.kafka.connect.task.StatusResponseValidator\", \"response.validator.codes.success\": \"200\", \"response.validator.codes.error\": \"400\",   .... }<\/code><\/pre>\n<p>\u0421\u0430\u043c\u0430\u044f \u043f\u0440\u043e\u0441\u0442\u0430\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f \u0442\u0430\u0441\u043a\u0438 \u0441 HttpClient \u0431\u0443\u0434\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0442\u0430\u043a:<\/p>\n<pre><code class=\"kotlin\">class HttpSinkTask : SinkTask() {     companion object {        private val log = LoggerFactory.getLogger(this::class.java)    }     protected lateinit var config: HttpSinkConfig    protected lateinit var httpClient: HttpClient    protected lateinit var responseValidator: HttpResponseValidator     override fun version(): String = \"1.0\"    override fun stop() {}    override fun flush(currentOffsets: MutableMap&lt;TopicPartition, OffsetAndMetadata&gt;) { }     override fun start(props: Map&lt;String, String&gt;) {        log.info(\"Starting http sink task...\")        config = HttpSinkConfig(props)        httpClient = config.httpClient()        responseValidator = config.responseValidator()    }     override fun put(records: Collection&lt;SinkRecord&gt;) {        if (records.isEmpty()) return         log.debug(            \"Received {} records. First record kafka coordinates:({}-{}-{}).\",            records.size, records.first().topic(), records.first().kafkaPartition(), records.first().kafkaOffset()        )         records.forEach { record -&gt;                record.toHttpRequestSafe()                    ?.let { request -&gt;                        request.send()                            .also { response -&gt;                                log.trace(\"Http request: {}, Http response: {}\", request, response)                                response.validate(record)                            }                    }        }    }     private fun HttpRequest.send() = try {        httpClient.send(this, HttpResponse.BodyHandlers.ofString())    } catch (ex: Exception) {        \/\/ \u0421\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0431\u0443\u0434\u0443\u0442 \u043f\u0435\u0440\u0435\u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0430\u043d\u044b \u0447\u0435\u0440\u0435\u0437 backoffTimeoutMs        log.error(\"Error during sending http request: $this\", ex)        log.info(\"Context timeout before retry\")        context.timeout(config.backoffTimeoutMs())        log.info(\"Throw exception after context timeout\")        throw RetriableException(ex)    }     protected fun HttpResponse&lt;String&gt;.validate(record: SinkRecord) {        try {            responseValidator(this)        } catch (ex: RetriableException) {            \/\/ \u0421\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0431\u0443\u0434\u0443\u0442 \u043f\u0435\u0440\u0435\u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0430\u043d\u044b \u0447\u0435\u0440\u0435\u0437 backoffTimeoutMs            log.info(\"Context timeout before retry\")            context.timeout(config.backoffTimeoutMs())            log.info(\"Throw exception after timeout\")            throw ex        } catch (ex: Exception) {            log.error(\"Matching response failed\", ex)            \/\/ \u041f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 \u0437\u0430\u0432\u0438\u0441\u0438\u0442 \u043e\u0442 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0442\u0430\u0441\u043a\u0438 errors.tolerance:            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0430\u0435\u0442\u0441\u044f, \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"none\"            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0430\u0435\u0442\u0441\u044f, \u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044e\u0442\u0441\u044f \u0432 dead letter,            \/\/    \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"all\"             context.errantRecordReporter().report(record, ex)        }    }     protected fun SinkRecord.toHttpRequestSafe() = try {        toHttpRequest()    } catch (ex: Exception) {        log.error(\"Invalid record\", ex)        \/\/ \u041f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 \u0437\u0430\u0432\u0438\u0441\u0438\u0442 \u043e\u0442 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0442\u0430\u0441\u043a\u0438 errors.tolerance:            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0430\u0435\u0442\u0441\u044f, \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"none\"            \/\/ * \u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0430\u0435\u0442\u0441\u044f, \u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044e\u0442\u0441\u044f \u0432 dead letter,            \/\/    \u0435\u0441\u043b\u0438 \"errors.tolerance\": \"all\"         context.errantRecordReporter().report(this, ex)        null    }     protected fun SinkRecord.toHttpRequest() =        (value() as Struct)            .let { httpStruct -&gt;                HttpRequest.newBuilder()                    .uri(URI.create(httpStruct.getHttpUrl()))                    .method(                        httpStruct.getHttpMethod(),                        httpStruct.getHttpBody()?.let { HttpRequest.BodyPublishers.ofString(it) } ?: HttpRequest.BodyPublishers.noBody()                    )                    .apply {                        httpStruct.getHttpHeaders()?.forEach { (k, v) -&gt; header(k, v) }                    }                    .build()            } } <\/code><\/pre>\n<p>  \u0420\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f \u043a\u043b\u0430\u0441\u0441\u0430 \u043a\u043e\u043d\u043d\u0435\u043a\u0442\u043e\u0440\u0430:<\/p>\n<pre><code class=\"kotlin\">class HttpSinkConnector : SinkConnector() {     private val log = LoggerFactory.getLogger(HttpSinkConnector::class.java)    private lateinit var settings: Map&lt;String, String&gt;     override fun version(): String = \"1.0\"<\/code><\/pre>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[],"class_list":["post-436012","post","type-post","status-publish","format-standard","hentry"],"_links":{"self":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/436012","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=436012"}],"version-history":[{"count":0,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/436012\/revisions"}],"wp:attachment":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=436012"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=436012"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=436012"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}