{"id":321500,"date":"2021-04-15T15:00:38","date_gmt":"2021-04-15T15:00:38","guid":{"rendered":"http:\/\/savepearlharbor.com\/?p=321500"},"modified":"-0001-11-30T00:00:00","modified_gmt":"-0001-11-29T21:00:00","slug":"","status":"publish","type":"post","link":"https:\/\/savepearlharbor.com\/?p=321500","title":{"rendered":"\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435 Spring Cloud Stream Binding \u0441 \u0431\u0440\u043e\u043a\u0435\u0440\u043e\u043c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 Kafka"},"content":{"rendered":"\n<div class=\"post__text post__text_v2\" id=\"post-content-body\">\n<p>\u0412\u0441\u0435\u043c \u043f\u0440\u0438\u0432\u0435\u0442! \u041c\u0435\u043d\u044f \u0437\u043e\u0432\u0443\u0442 \u0412\u0438\u0442\u0430\u043b\u0438\u0439, \u044f \u0440\u0430\u0437\u0440\u0430\u0431\u043e\u0442\u0447\u0438\u043a \u0432 \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u0438 Web3Tech. \u0412 \u044d\u0442\u043e\u043c \u043f\u043e\u0441\u0442\u0435 \u044f \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u044e \u043e\u0441\u043d\u043e\u0432\u043d\u044b\u0435 \u043a\u043e\u043d\u0446\u0435\u043f\u0446\u0438\u0438 \u0438 \u043a\u043e\u043d\u0441\u0442\u0440\u0443\u043a\u0446\u0438\u0438 \u043f\u043b\u0430\u0442\u0444\u043e\u0440\u043c\u044b Spring Cloud Stream \u0434\u043b\u044f \u043f\u043e\u0434\u0434\u0435\u0440\u0436\u043a\u0438 \u0438 \u0440\u0430\u0431\u043e\u0442\u044b \u0441 \u0431\u0440\u043e\u043a\u0435\u0440\u0430\u043c\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 Kafka, \u0441 \u043f\u043e\u043b\u043d\u044b\u043c \u0446\u0438\u043a\u043b\u043e\u043c \u0438\u0445 \u043a\u043e\u043d\u0442\u0435\u043a\u0441\u0442\u043d\u043e\u0433\u043e unit-\u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f. \u041c\u044b \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c \u0442\u0430\u043a\u0443\u044e \u0441\u0445\u0435\u043c\u0443 \u0432 \u0441\u0432\u043e\u0435\u043c \u043f\u0440\u043e\u0435\u043a\u0442\u0435 \u0432\u0441\u0435\u0440\u043e\u0441\u0441\u0438\u0439\u0441\u043a\u043e\u0433\u043e \u044d\u043b\u0435\u043a\u0442\u0440\u043e\u043d\u043d\u043e\u0433\u043e \u0433\u043e\u043b\u043e\u0441\u043e\u0432\u0430\u043d\u0438\u044f \u043d\u0430 \u0431\u043b\u043e\u043a\u0447\u0435\u0439\u043d-\u043f\u043b\u0430\u0442\u0444\u043e\u0440\u043c\u0435 <a href=\"https:\/\/wavesenterprise.com\/\" rel=\"noopener noreferrer nofollow\"><u>Waves Enterprise<\/u><\/a>.<\/p>\n<figure class=\"full-width\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/habrastorage.org\/getpro\/habr\/upload_files\/1cf\/772\/f40\/1cf772f402c6609d7c86ad182de6be8a.jpeg\" width=\"800\" height=\"375\"><figcaption><\/figcaption><\/figure>\n<p>\u042f\u0432\u043b\u044f\u044f\u0441\u044c \u0447\u0430\u0441\u0442\u044c\u044e \u0433\u0440\u0443\u043f\u043f\u044b \u043f\u0440\u043e\u0435\u043a\u0442\u043e\u0432 Spring Cloud, Spring Cloud Stream \u043e\u0441\u043d\u043e\u0432\u0430\u043d \u043d\u0430 Spring Boot \u0438 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442 Spring Integration \u0434\u043b\u044f \u043e\u0431\u0435\u0441\u043f\u0435\u0447\u0435\u043d\u0438\u044f \u0441\u0432\u044f\u0437\u0438 \u0441 \u0431\u0440\u043e\u043a\u0435\u0440\u0430\u043c\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439. \u041f\u0440\u0438 \u044d\u0442\u043e\u043c \u043e\u043d \u043b\u0435\u0433\u043a\u043e \u0438\u043d\u0442\u0435\u0433\u0440\u0438\u0440\u0443\u0435\u0442\u0441\u044f \u0441 \u0440\u0430\u0437\u043b\u0438\u0447\u043d\u044b\u043c\u0438 \u0431\u0440\u043e\u043a\u0435\u0440\u0430\u043c\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0438 \u0442\u0440\u0435\u0431\u0443\u0435\u0442 \u043c\u0438\u043d\u0438\u043c\u0430\u043b\u044c\u043d\u043e\u0439 \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u0438 \u0434\u043b\u044f \u0441\u043e\u0437\u0434\u0430\u043d\u0438\u044f event-driven \u0438\u043b\u0438 message-driven \u043c\u0438\u043a\u0440\u043e\u0441\u0435\u0440\u0432\u0438\u0441\u043e\u0432.<\/p>\n<h2>\u041a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u044f \u0438 \u0437\u0430\u0432\u0438\u0441\u0438\u043c\u043e\u0441\u0442\u0438<\/h2>\n<p>\u0414\u043b\u044f \u043d\u0430\u0447\u0430\u043b\u0430 \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u043e \u0434\u043e\u0431\u0430\u0432\u0438\u0442\u044c \u0437\u0430\u0432\u0438\u0441\u0438\u043c\u043e\u0441\u0442\u044c spring-cloud-starter-stream-kafka \u0432 <em>build.gradle<\/em>:<\/p>\n<pre><code class=\"kotlin\">dependencies {    implementation(kotlin(\"stdlib\"))    implementation(\"org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion\")    implementation(\"com.fasterxml.jackson.module:jackson-module-kotlin\")     implementation(\"org.springframework.boot:spring-boot-starter-web\")    implementation(\"org.springframework.cloud:spring-cloud-starter-stream-kafka\")     testImplementation(\"org.springframework.boot:spring-boot-starter-test\")    testImplementation(\"org.springframework.cloud:spring-cloud-stream-test-support\")    testImplementation(\"org.springframework.kafka:spring-kafka-test:springKafkaTestVersion\") }<\/code><\/pre>\n<p>\u0412 \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u044e \u043f\u0440\u043e\u0435\u043a\u0442\u0430 Spring Cloud Stream \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0432\u043a\u043b\u044e\u0447\u0438\u0442\u044c URL Kafka-\u0431\u0440\u043e\u043a\u0435\u0440\u0430, \u0438\u043c\u044f \u043e\u0447\u0435\u0440\u0435\u0434\u0438 (\u0442\u043e\u043f\u0438\u043a) \u0438 \u0434\u0440\u0443\u0433\u0438\u0435 \u043f\u0430\u0440\u0430\u043c\u0435\u0442\u0440\u044b \u0431\u0438\u043d\u0434\u0438\u043d\u0433\u0430. \u0412\u043e\u0442 \u043f\u0440\u0438\u043c\u0435\u0440 YAML-\u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u0438 \u0434\u043b\u044f \u0441\u0435\u0440\u0432\u0438\u0441\u0430 <em>application.yaml<\/em>:<\/p>\n<pre><code class=\"json\">spring:  application:    name: cloud-stream-binding-kafka-app  cloud:    stream:      kafka:        binder:          brokers: 0.0.0.0:8080          configuration:            auto-offset-reset: latest      bindings:        customChannel:                   #Channel name          destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)          group: input-group-N          contentType: application\/json          consumer:            max-attempts: 1            autoCommitOffset: true            autoCommitOnError: false<\/code><\/pre>\n<h3>\u041a\u043e\u043d\u0446\u0435\u043f\u0446\u0438\u044f \u0438 \u043a\u043b\u0430\u0441\u0441\u044b<\/h3>\n<p>\u041f\u043e \u0441\u0443\u0442\u0438, \u043c\u044b \u0438\u043c\u0435\u0435\u043c \u0434\u0435\u043b\u043e \u0441 \u0441\u0435\u0440\u0432\u0438\u0441\u043e\u043c, \u043f\u043e\u0441\u0442\u0440\u043e\u0435\u043d\u043d\u044b\u043c \u043d\u0430 Spring Cloud Stream, \u043a\u043e\u0442\u043e\u0440\u044b\u0439 \u043f\u0440\u043e\u0441\u043b\u0443\u0448\u0438\u0432\u0430\u0435\u0442 \u0432\u0445\u043e\u0434\u044f\u0449\u0443\u044e \u043e\u0447\u0435\u0440\u0435\u0434\u044c, \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044f \u0431\u0438\u043d\u0434\u0438\u043d\u0433\u0438 (<em>SpringCloudStreamBindingKafkaApp.kt<\/em>):<\/p>\n<pre><code class=\"kotlin\">@EnableBinding(ProducerBinding::class)  @SpringBootApplication    class SpringCloudStreamBindingKafkaApp   fun main(args: Array&lt;String&gt;) {   &nbsp;&nbsp;&nbsp;SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)   }<\/code><\/pre>\n<p>\u0410\u043d\u043d\u043e\u0442\u0430\u0446\u0438\u044f @EnableBinding \u0443\u043a\u0430\u0437\u044b\u0432\u0430\u0435\u0442 \u0441\u0435\u0440\u0432\u0438\u0441\u0443 \u043d\u0430 \u0431\u0438\u043d\u0434\u0438\u043d\u0433 \u043a\u0430\u043a \u0432\u0445\u043e\u0434\u044f\u0449\u0435\u0433\u043e, \u0442\u0430\u043a \u0438 \u0438\u0441\u0445\u043e\u0434\u044f\u0449\u0435\u0433\u043e \u043a\u0430\u043d\u0430\u043b\u0430.<\/p>\n<p>\u0417\u0434\u0435\u0441\u044c \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0443\u0442\u043e\u0447\u043d\u0438\u0442\u044c \u0440\u044f\u0434 \u043a\u043e\u043d\u0446\u0435\u043f\u0446\u0438\u0439.<\/p>\n<p><strong>Binding<\/strong> \u2014&nbsp;\u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441, \u0432 \u043a\u043e\u0442\u043e\u0440\u043e\u043c \u043e\u043f\u0438\u0441\u0430\u043d\u044b \u0432\u0445\u043e\u0434\u044f\u0449\u0438\u0435 \u0438 \u0438\u0441\u0445\u043e\u0434\u044f\u0449\u0438\u0435 \u043a\u0430\u043d\u0430\u043b\u044b.<br \/><strong>Binder<\/strong> \u2014 \u0438\u043c\u043f\u043b\u0435\u043c\u0435\u043d\u0442\u0430\u0446\u0438\u044f middleware \u0434\u043b\u044f \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439.<br \/><strong>Channel<\/strong> \u2014 \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u044f\u0435\u0442 \u043a\u0430\u043d\u0430\u043b \u0434\u043b\u044f \u043f\u0435\u0440\u0435\u0434\u0430\u0447\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u043c\u0435\u0436\u0434\u0443 middleware \u0438 \u043f\u0440\u0438\u043b\u043e\u0436\u0435\u043d\u0438\u0435\u043c.<br \/><strong>StreamListeners<\/strong> \u2014 \u043c\u0435\u0442\u043e\u0434\u044b \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0432 \u0432\u0438\u0434\u0435 \u0431\u0438\u043d\u043e\u0432 (beans), \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0431\u0443\u0434\u0443\u0442 \u0430\u0432\u0442\u043e\u043c\u0430\u0442\u0438\u0447\u0435\u0441\u043a\u0438 \u0432\u044b\u0437\u0432\u0430\u043d\u044b \u043f\u043e\u0441\u043b\u0435 \u0442\u043e\u0433\u043e, \u043a\u0430\u043a MessageConverter \u043e\u0441\u0443\u0449\u0435\u0441\u0442\u0432\u0438\u0442 \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0438\u043b\u0438 \u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u043c\u0435\u0436\u0434\u0443 \u0441\u043e\u0431\u044b\u0442\u0438\u044f\u043c\u0438 \u0432 middleware \u0438 \u0442\u0438\u043f\u0430\u043c\u0438 \u043e\u0431\u044a\u0435\u043a\u0442\u043e\u0432 \u0432 \u0434\u043e\u043c\u0435\u043d\u0435 \u201cDTO\u201d.<br \/><strong>Message Schema<\/strong> \u2014 \u0441\u0445\u0435\u043c\u044b, \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c\u044b\u0435 \u0434\u043b\u044f \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0438 \u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439. \u041c\u043e\u0433\u0443\u0442 \u0431\u044b\u0442\u044c \u043f\u0440\u043e\u0447\u0438\u0442\u0430\u043d\u044b \u0438\u0437 \u0438\u0441\u0442\u043e\u0447\u043d\u0438\u043a\u0430 \u0438\u043b\u0438 \u0434\u0438\u043d\u0430\u043c\u0438\u0447\u0435\u0441\u043a\u0438 \u0437\u0430\u0433\u0440\u0443\u0436\u0435\u043d\u044b.<\/p>\n<h2>\u0422\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435<\/h2>\n<p>\u0427\u0442\u043e\u0431\u044b \u043f\u0440\u043e\u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0435 \u0438 \u043e\u043f\u0435\u0440\u0430\u0446\u0438\u0438 send\/receive, \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u043e \u0441\u043e\u0437\u0434\u0430\u0442\u044c \u043a\u0430\u043a \u043c\u0438\u043d\u0438\u043c\u0443\u043c \u043e\u0434\u043d\u043e\u0433\u043e producer \u0438 \u043e\u0434\u043d\u043e\u0433\u043e consumer. \u0412\u043e\u0442 \u043f\u0440\u043e\u0441\u0442\u0435\u0439\u0448\u0438\u0439 \u043f\u0440\u0438\u043c\u0435\u0440 \u0442\u043e\u0433\u043e, \u043a\u0430\u043a \u044d\u0442\u043e \u043c\u043e\u0436\u043d\u043e \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u0432 Spring Cloud Stream.<\/p>\n<p>\u0418\u043d\u0441\u0442\u0430\u043d\u0441 \u0431\u0438\u043d\u0430 Producer \u0431\u0443\u0434\u0435\u0442 \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0435 \u0432 \u0442\u043e\u043f\u0438\u043a Kafka, \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044f \u0431\u0438\u043d\u0434\u0435\u0440 (<em>ProducerBinding.kt<\/em>):<\/p>\n<pre><code class=\"kotlin\">interface ProducerBinding {     @Output(BINDING_TARGET_NAME)    fun messageChannel(): MessageChannel }<\/code><\/pre>\n<p>\u0418\u043d\u0441\u0442\u0430\u043d\u0441 \u0431\u0438\u043d\u0430 \u0421onsumer \u0431\u0443\u0434\u0435\u0442 \u0441\u043b\u0443\u0448\u0430\u0442\u044c \u0442\u043e\u043f\u0438\u043a Kafka \u0438 \u043f\u043e\u043b\u0443\u0447\u0430\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f.<\/p>\n<p><em>ConsumerBinding.kt<\/em>:<\/p>\n<pre><code class=\"kotlin\">interface ConsumerBinding {     companion object {        const val BINDING_TARGET_NAME = \"customChannel\"    }     @Input(BINDING_TARGET_NAME)    fun messageChannel(): MessageChannel }<\/code><\/pre>\n<p><em>Consumer.kt<\/em>:<\/p>\n<pre><code class=\"kotlin\">@EnableBinding(ConsumerBinding::class) class Consumer(val messageService: MessageService) {     @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)    fun process(        @Payload message: Map&lt;String, Any?&gt;,        @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?    ) {        messageService.consume(message)    } }<\/code><\/pre>\n<p>\u041c\u044b \u0441\u043e\u0437\u0434\u0430\u043b\u0438 \u0431\u0440\u043e\u043a\u0435\u0440 Kafka \u0441 \u0442\u043e\u043f\u0438\u043a\u043e\u043c. \u0414\u043b\u044f \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f \u0431\u0443\u0434\u0435\u043c \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u0432\u0441\u0442\u0440\u043e\u0435\u043d\u043d\u0443\u044e Kafka, \u0434\u043e\u0441\u0442\u0443\u043f\u043d\u0443\u044e \u043d\u0430\u043c \u0441 \u0437\u0430\u0432\u0438\u0441\u0438\u043c\u043e\u0441\u0442\u044c\u044e spring-kafka-test.<\/p>\n<h2>\u0424\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0441 MessageCollector<\/h2>\n<p>\u041c\u044b \u0438\u043c\u0435\u0435\u043c \u0434\u0435\u043b\u043e \u0441 \u0438\u043c\u043f\u043b\u0435\u043c\u0435\u043d\u0442\u0430\u0446\u0438\u0435\u0439 \u0431\u0438\u043d\u0434\u0435\u0440\u0430, \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u044e\u0449\u0435\u0439 \u0432\u0437\u0430\u0438\u043c\u043e\u0434\u0435\u0439\u0441\u0442\u0432\u043e\u0432\u0430\u0442\u044c \u0441 \u043a\u0430\u043d\u0430\u043b\u0430\u043c\u0438 \u0438 \u043f\u043e\u043b\u0443\u0447\u0430\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f. \u041e\u0442\u043f\u0440\u0430\u0432\u0438\u043c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0435 \u0432 \u043a\u0430\u043d\u0430\u043b ProducerBinding \u0438 \u0437\u0430\u0442\u0435\u043c \u043f\u043e\u043b\u0443\u0447\u0438\u043c \u0435\u0433\u043e \u0432 \u0432\u0438\u0434\u0435 payload&nbsp;<em>ProducerTest.kt<\/em>:<\/p>\n<pre><code class=\"kotlin\">@SpringBootTest class ProducerTest {     @Autowired    lateinit var producerBinding: ProducerBinding     @Autowired    lateinit var messageCollector: MessageCollector     @Test    fun `should produce somePayload to channel`() {        \/\/ ARRANGE        val request = mapOf(1 to \"foo\", 2 to \"bar\", \"three\" to 10101)         \/\/ ACT producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())        val payload = messageCollector.forChannel(producerBinding.messageChannel())            .poll()            .payload         \/\/ ASSERT        val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)        assertTrue(request.entries.stream().allMatch { re -&gt;            re.value == payloadAsMap[re.key.toString()]        })         messageCollector.forChannel(producerBinding.messageChannel()).clear()    } }<\/code><\/pre>\n<h2>\u0422\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0441 \u0431\u0440\u043e\u043a\u0435\u0440\u043e\u043c Embedded Kafka<\/h2>\n<p>\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c \u0430\u043d\u043d\u043e\u0442\u0430\u0446\u0438\u044e @ClassRule \u0434\u043b\u044f \u0441\u043e\u0437\u0434\u0430\u043d\u0438\u044f \u0431\u0440\u043e\u043a\u0435\u0440\u0430. \u0422\u0430\u043a \u043c\u044b \u0441\u043c\u043e\u0436\u0435\u043c \u043f\u043e\u0434\u043d\u044f\u0442\u044c \u0441\u0435\u0440\u0432\u0435\u0440\u0430 Kafka \u0438 Zookeeper \u043d\u0430 \u0441\u043b\u0443\u0447\u0430\u0439\u043d\u043e\u043c \u043f\u043e\u0440\u0442\u0435 \u043f\u0435\u0440\u0435\u0434 \u043d\u0430\u0447\u0430\u043b\u043e\u043c \u0442\u0435\u0441\u0442\u0430 \u0438 \u0432\u044b\u043a\u043b\u044e\u0447\u0438\u0442\u044c \u0438\u0445, \u043a\u043e\u0433\u0434\u0430 \u0442\u0435\u0441\u0442 \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u0442\u0441\u044f. \u042d\u0442\u043e \u0438\u0437\u0431\u0430\u0432\u043b\u044f\u0435\u0442 \u043d\u0430\u0441 \u043e\u0442 \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e\u0441\u0442\u0438 \u0432 \u0440\u0430\u0431\u043e\u0447\u0435\u043c \u0438\u043d\u0441\u0442\u0430\u043d\u0441\u0435 Kafka \u0438 Zookeper \u043d\u0430 \u0432\u0441\u0451 \u0432\u0440\u0435\u043c\u044f \u043f\u0440\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u044f \u0442\u0435\u0441\u0442\u0430 (<em>ConsumerTest.kt<\/em>):<\/p>\n<pre><code class=\"kotlin\">@SpringBootTest @ActiveProfiles(\"test\") @EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class]) @EnableBinding(ProducerBinding::class) class ConsumerTest {     @Autowired    lateinit var producerBinding: ProducerBinding     @Autowired    lateinit var objectMapper: ObjectMapper     @MockBean    lateinit var messageService: MessageService     companion object {        @ClassRule @JvmField        var embeddedKafka = EmbeddedKafkaRule(1, true, \"any-name-of-topic\")    }     @Test    fun `should consume via txConsumer process`() {        \/\/ ACT        val request = mapOf(1 to \"foo\", 2 to \"bar\")        producerBinding.messageChannel().send(MessageBuilder.withPayload(request)            .setHeader(\"someHeaderName\", \"someHeaderValue\")            .build())         \/\/ ASSERT        val requestAsMap = objectMapper.readValue&lt;Map&lt;String, Any?&gt;&gt;(objectMapper.writeValueAsString(request))        runBlocking {            delay(20)            verify(messageService).consume(requestAsMap)        }    } }<\/code><\/pre>\n<h2>\u0417\u0430\u043a\u043b\u044e\u0447\u0435\u043d\u0438\u0435<\/h2>\n<p>\u0412 \u044d\u0442\u043e\u043c \u043f\u043e\u0441\u0442\u0435 \u044f \u043f\u0440\u043e\u0434\u0435\u043c\u043e\u043d\u0441\u0442\u0440\u0438\u0440\u043e\u0432\u0430\u043b \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u0438 Spring Cloud Stream \u0438 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u044f \u0435\u0433\u043e \u0441 Kafka. Spring Cloud Stream \u043f\u0440\u0435\u0434\u043b\u0430\u0433\u0430\u0435\u0442 \u0443\u0434\u043e\u0431\u043d\u044b\u0439 \u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441 \u0441 \u0443\u043f\u0440\u043e\u0449\u0435\u043d\u043d\u044b\u043c\u0438 \u043d\u044e\u0430\u043d\u0441\u0430\u043c\u0438 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0431\u0440\u043e\u043a\u0435\u0440\u0430, \u0431\u044b\u0441\u0442\u0440\u043e \u0432\u043d\u0435\u0434\u0440\u044f\u0435\u0442\u0441\u044f, \u0441\u0442\u0430\u0431\u0438\u043b\u044c\u043d\u043e \u0440\u0430\u0431\u043e\u0442\u0430\u0435\u0442 \u0438 \u043f\u043e\u0434\u0434\u0435\u0440\u0436\u0438\u0432\u0430\u0435\u0442 \u0441\u043e\u0432\u0440\u0435\u043c\u0435\u043d\u043d\u044b\u0435 \u043f\u043e\u043f\u0443\u043b\u044f\u0440\u043d\u044b\u0435 \u0431\u0440\u043e\u043a\u0435\u0440\u044b, \u0442\u0430\u043a\u0438\u0435 \u043a\u0430\u043a Kafka. \u041f\u043e \u0438\u0442\u043e\u0433\u0430\u043c \u044f \u043f\u0440\u0438\u0432\u0435\u043b \u0440\u044f\u0434 \u043f\u0440\u0438\u043c\u0435\u0440\u043e\u0432 \u0441 unit-\u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435\u043c \u043d\u0430 \u043e\u0441\u043d\u043e\u0432\u0435 EmbeddedKafkaRule \u0441 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435\u043c MessageCollector.<\/p>\n<p>\u0412\u0441\u0435 \u0438\u0441\u0445\u043e\u0434\u043d\u0438\u043a\u0438 \u043c\u043e\u0436\u043d\u043e \u043d\u0430\u0439\u0442\u0438 \u043d\u0430 <a href=\"https:\/\/github.com\/Vito89\/cloud-stream-binding-kafka-app\" rel=\"noopener noreferrer nofollow\"><u>Github<\/u><\/a>. \u0421\u043f\u0430\u0441\u0438\u0431\u043e \u0437\u0430 \u043f\u0440\u043e\u0447\u0442\u0435\u043d\u0438\u0435!<\/p>\n<\/div>\n<p> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/post\/552448\/\"> https:\/\/habr.com\/ru\/post\/552448\/<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"\n<div class=\"post__text post__text_v2\" id=\"post-content-body\">\n<p>\u0412\u0441\u0435\u043c \u043f\u0440\u0438\u0432\u0435\u0442! \u041c\u0435\u043d\u044f \u0437\u043e\u0432\u0443\u0442 \u0412\u0438\u0442\u0430\u043b\u0438\u0439, \u044f \u0440\u0430\u0437\u0440\u0430\u0431\u043e\u0442\u0447\u0438\u043a \u0432 \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u0438 Web3Tech. \u0412 \u044d\u0442\u043e\u043c \u043f\u043e\u0441\u0442\u0435 \u044f \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u044e \u043e\u0441\u043d\u043e\u0432\u043d\u044b\u0435 \u043a\u043e\u043d\u0446\u0435\u043f\u0446\u0438\u0438 \u0438 \u043a\u043e\u043d\u0441\u0442\u0440\u0443\u043a\u0446\u0438\u0438 \u043f\u043b\u0430\u0442\u0444\u043e\u0440\u043c\u044b Spring Cloud Stream \u0434\u043b\u044f \u043f\u043e\u0434\u0434\u0435\u0440\u0436\u043a\u0438 \u0438 \u0440\u0430\u0431\u043e\u0442\u044b \u0441 \u0431\u0440\u043e\u043a\u0435\u0440\u0430\u043c\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 Kafka, \u0441 \u043f\u043e\u043b\u043d\u044b\u043c \u0446\u0438\u043a\u043b\u043e\u043c \u0438\u0445 \u043a\u043e\u043d\u0442\u0435\u043a\u0441\u0442\u043d\u043e\u0433\u043e unit-\u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f. \u041c\u044b \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c \u0442\u0430\u043a\u0443\u044e \u0441\u0445\u0435\u043c\u0443 \u0432 \u0441\u0432\u043e\u0435\u043c \u043f\u0440\u043e\u0435\u043a\u0442\u0435 \u0432\u0441\u0435\u0440\u043e\u0441\u0441\u0438\u0439\u0441\u043a\u043e\u0433\u043e \u044d\u043b\u0435\u043a\u0442\u0440\u043e\u043d\u043d\u043e\u0433\u043e \u0433\u043e\u043b\u043e\u0441\u043e\u0432\u0430\u043d\u0438\u044f \u043d\u0430 \u0431\u043b\u043e\u043a\u0447\u0435\u0439\u043d-\u043f\u043b\u0430\u0442\u0444\u043e\u0440\u043c\u0435 <a href=\"https:\/\/wavesenterprise.com\/\" rel=\"noopener noreferrer nofollow\"><u>Waves Enterprise<\/u><\/a>.<\/p>\n<figure class=\"full-width\"><figcaption><\/figcaption><\/figure>\n<p>\u042f\u0432\u043b\u044f\u044f\u0441\u044c \u0447\u0430\u0441\u0442\u044c\u044e \u0433\u0440\u0443\u043f\u043f\u044b \u043f\u0440\u043e\u0435\u043a\u0442\u043e\u0432 Spring Cloud, Spring Cloud Stream \u043e\u0441\u043d\u043e\u0432\u0430\u043d \u043d\u0430 Spring Boot \u0438 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442 Spring Integration \u0434\u043b\u044f \u043e\u0431\u0435\u0441\u043f\u0435\u0447\u0435\u043d\u0438\u044f \u0441\u0432\u044f\u0437\u0438 \u0441 \u0431\u0440\u043e\u043a\u0435\u0440\u0430\u043c\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439. \u041f\u0440\u0438 \u044d\u0442\u043e\u043c \u043e\u043d \u043b\u0435\u0433\u043a\u043e \u0438\u043d\u0442\u0435\u0433\u0440\u0438\u0440\u0443\u0435\u0442\u0441\u044f \u0441 \u0440\u0430\u0437\u043b\u0438\u0447\u043d\u044b\u043c\u0438 \u0431\u0440\u043e\u043a\u0435\u0440\u0430\u043c\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0438 \u0442\u0440\u0435\u0431\u0443\u0435\u0442 \u043c\u0438\u043d\u0438\u043c\u0430\u043b\u044c\u043d\u043e\u0439 \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u0438 \u0434\u043b\u044f \u0441\u043e\u0437\u0434\u0430\u043d\u0438\u044f event-driven \u0438\u043b\u0438 message-driven \u043c\u0438\u043a\u0440\u043e\u0441\u0435\u0440\u0432\u0438\u0441\u043e\u0432.<\/p>\n<h2>\u041a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u044f \u0438 \u0437\u0430\u0432\u0438\u0441\u0438\u043c\u043e\u0441\u0442\u0438<\/h2>\n<p>\u0414\u043b\u044f \u043d\u0430\u0447\u0430\u043b\u0430 \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u043e \u0434\u043e\u0431\u0430\u0432\u0438\u0442\u044c \u0437\u0430\u0432\u0438\u0441\u0438\u043c\u043e\u0441\u0442\u044c spring-cloud-starter-stream-kafka \u0432 <em>build.gradle<\/em>:<\/p>\n<pre><code class=\"kotlin\">dependencies {    implementation(kotlin(\"stdlib\"))    implementation(\"org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion\")    implementation(\"com.fasterxml.jackson.module:jackson-module-kotlin\")     implementation(\"org.springframework.boot:spring-boot-starter-web\")    implementation(\"org.springframework.cloud:spring-cloud-starter-stream-kafka\")     testImplementation(\"org.springframework.boot:spring-boot-starter-test\")    testImplementation(\"org.springframework.cloud:spring-cloud-stream-test-support\")    testImplementation(\"org.springframework.kafka:spring-kafka-test:springKafkaTestVersion\") }<\/code><\/pre>\n<p>\u0412 \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u044e \u043f\u0440\u043e\u0435\u043a\u0442\u0430 Spring Cloud Stream \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0432\u043a\u043b\u044e\u0447\u0438\u0442\u044c URL Kafka-\u0431\u0440\u043e\u043a\u0435\u0440\u0430, \u0438\u043c\u044f \u043e\u0447\u0435\u0440\u0435\u0434\u0438 (\u0442\u043e\u043f\u0438\u043a) \u0438 \u0434\u0440\u0443\u0433\u0438\u0435 \u043f\u0430\u0440\u0430\u043c\u0435\u0442\u0440\u044b \u0431\u0438\u043d\u0434\u0438\u043d\u0433\u0430. \u0412\u043e\u0442 \u043f\u0440\u0438\u043c\u0435\u0440 YAML-\u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u0438 \u0434\u043b\u044f \u0441\u0435\u0440\u0432\u0438\u0441\u0430 <em>application.yaml<\/em>:<\/p>\n<pre><code class=\"json\">spring:  application:    name: cloud-stream-binding-kafka-app  cloud:    stream:      kafka:        binder:          brokers: 0.0.0.0:8080          configuration:            auto-offset-reset: latest      bindings:        customChannel:                   #Channel name          destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)          group: input-group-N          contentType: application\/json          consumer:            max-attempts: 1            autoCommitOffset: true            autoCommitOnError: false<\/code><\/pre>\n<h3>\u041a\u043e\u043d\u0446\u0435\u043f\u0446\u0438\u044f \u0438 \u043a\u043b\u0430\u0441\u0441\u044b<\/h3>\n<p>\u041f\u043e \u0441\u0443\u0442\u0438, \u043c\u044b \u0438\u043c\u0435\u0435\u043c \u0434\u0435\u043b\u043e \u0441 \u0441\u0435\u0440\u0432\u0438\u0441\u043e\u043c, \u043f\u043e\u0441\u0442\u0440\u043e\u0435\u043d\u043d\u044b\u043c \u043d\u0430 Spring Cloud Stream, \u043a\u043e\u0442\u043e\u0440\u044b\u0439 \u043f\u0440\u043e\u0441\u043b\u0443\u0448\u0438\u0432\u0430\u0435\u0442 \u0432\u0445\u043e\u0434\u044f\u0449\u0443\u044e \u043e\u0447\u0435\u0440\u0435\u0434\u044c, \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044f \u0431\u0438\u043d\u0434\u0438\u043d\u0433\u0438 (<em>SpringCloudStreamBindingKafkaApp.kt<\/em>):<\/p>\n<pre><code class=\"kotlin\">@EnableBinding(ProducerBinding::class)  @SpringBootApplication    class SpringCloudStreamBindingKafkaApp   fun main(args: Array&lt;String&gt;) {   &nbsp;&nbsp;&nbsp;SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)   }<\/code><\/pre>\n<p>\u0410\u043d\u043d\u043e\u0442\u0430\u0446\u0438\u044f @EnableBinding \u0443\u043a\u0430\u0437\u044b\u0432\u0430\u0435\u0442 \u0441\u0435\u0440\u0432\u0438\u0441\u0443 \u043d\u0430 \u0431\u0438\u043d\u0434\u0438\u043d\u0433 \u043a\u0430\u043a \u0432\u0445\u043e\u0434\u044f\u0449\u0435\u0433\u043e, \u0442\u0430\u043a \u0438 \u0438\u0441\u0445\u043e\u0434\u044f\u0449\u0435\u0433\u043e \u043a\u0430\u043d\u0430\u043b\u0430.<\/p>\n<p>\u0417\u0434\u0435\u0441\u044c \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0443\u0442\u043e\u0447\u043d\u0438\u0442\u044c \u0440\u044f\u0434 \u043a\u043e\u043d\u0446\u0435\u043f\u0446\u0438\u0439.<\/p>\n<p><strong>Binding<\/strong> \u2014&nbsp;\u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441, \u0432 \u043a\u043e\u0442\u043e\u0440\u043e\u043c \u043e\u043f\u0438\u0441\u0430\u043d\u044b \u0432\u0445\u043e\u0434\u044f\u0449\u0438\u0435 \u0438 \u0438\u0441\u0445\u043e\u0434\u044f\u0449\u0438\u0435 \u043a\u0430\u043d\u0430\u043b\u044b.<br \/><strong>Binder<\/strong> \u2014 \u0438\u043c\u043f\u043b\u0435\u043c\u0435\u043d\u0442\u0430\u0446\u0438\u044f middleware \u0434\u043b\u044f \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439.<br \/><strong>Channel<\/strong> \u2014 \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u044f\u0435\u0442 \u043a\u0430\u043d\u0430\u043b \u0434\u043b\u044f \u043f\u0435\u0440\u0435\u0434\u0430\u0447\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u043c\u0435\u0436\u0434\u0443 middleware \u0438 \u043f\u0440\u0438\u043b\u043e\u0436\u0435\u043d\u0438\u0435\u043c.<br \/><strong>StreamListeners<\/strong> \u2014 \u043c\u0435\u0442\u043e\u0434\u044b \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0432 \u0432\u0438\u0434\u0435 \u0431\u0438\u043d\u043e\u0432 (beans), \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0431\u0443\u0434\u0443\u0442 \u0430\u0432\u0442\u043e\u043c\u0430\u0442\u0438\u0447\u0435\u0441\u043a\u0438 \u0432\u044b\u0437\u0432\u0430\u043d\u044b \u043f\u043e\u0441\u043b\u0435 \u0442\u043e\u0433\u043e, \u043a\u0430\u043a MessageConverter \u043e\u0441\u0443\u0449\u0435\u0441\u0442\u0432\u0438\u0442 \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0438\u043b\u0438 \u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u043c\u0435\u0436\u0434\u0443 \u0441\u043e\u0431\u044b\u0442\u0438\u044f\u043c\u0438 \u0432 middleware \u0438 \u0442\u0438\u043f\u0430\u043c\u0438 \u043e\u0431\u044a\u0435\u043a\u0442\u043e\u0432 \u0432 \u0434\u043e\u043c\u0435\u043d\u0435 \u201cDTO\u201d.<br \/><strong>Message Schema<\/strong> \u2014 \u0441\u0445\u0435\u043c\u044b, \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c\u044b\u0435 \u0434\u043b\u044f \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0438 \u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439. \u041c\u043e\u0433\u0443\u0442 \u0431\u044b\u0442\u044c \u043f\u0440\u043e\u0447\u0438\u0442\u0430\u043d\u044b \u0438\u0437 \u0438\u0441\u0442\u043e\u0447\u043d\u0438\u043a\u0430 \u0438\u043b\u0438 \u0434\u0438\u043d\u0430\u043c\u0438\u0447\u0435\u0441\u043a\u0438 \u0437\u0430\u0433\u0440\u0443\u0436\u0435\u043d\u044b.<\/p>\n<h2>\u0422\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435<\/h2>\n<p>\u0427\u0442\u043e\u0431\u044b \u043f\u0440\u043e\u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0435 \u0438 \u043e\u043f\u0435\u0440\u0430\u0446\u0438\u0438 send\/receive, \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u043e \u0441\u043e\u0437\u0434\u0430\u0442\u044c \u043a\u0430\u043a \u043c\u0438\u043d\u0438\u043c\u0443\u043c \u043e\u0434\u043d\u043e\u0433\u043e producer \u0438 \u043e\u0434\u043d\u043e\u0433\u043e consumer. \u0412\u043e\u0442 \u043f\u0440\u043e\u0441\u0442\u0435\u0439\u0448\u0438\u0439 \u043f\u0440\u0438\u043c\u0435\u0440 \u0442\u043e\u0433\u043e, \u043a\u0430\u043a \u044d\u0442\u043e \u043c\u043e\u0436\u043d\u043e \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u0432 Spring Cloud Stream.<\/p>\n<p>\u0418\u043d\u0441\u0442\u0430\u043d\u0441 \u0431\u0438\u043d\u0430 Producer \u0431\u0443\u0434\u0435\u0442 \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0435 \u0432 \u0442\u043e\u043f\u0438\u043a Kafka, \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044f \u0431\u0438\u043d\u0434\u0435\u0440 (<em>ProducerBinding.kt<\/em>):<\/p>\n<pre><code class=\"kotlin\">interface ProducerBinding {     @Output(BINDING_TARGET_NAME)    fun messageChannel(): MessageChannel }<\/code><\/pre>\n<p>\u0418\u043d\u0441\u0442\u0430\u043d\u0441 \u0431\u0438\u043d\u0430 \u0421onsumer \u0431\u0443\u0434\u0435\u0442 \u0441\u043b\u0443\u0448\u0430\u0442\u044c \u0442\u043e\u043f\u0438\u043a Kafka \u0438 \u043f\u043e\u043b\u0443\u0447\u0430\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f.<\/p>\n<p><em>ConsumerBinding.kt<\/em>:<\/p>\n<pre><code class=\"kotlin\">interface ConsumerBinding {     companion object {        const val BINDING_TARGET_NAME = \"customChannel\"    }     @Input(BINDING_TARGET_NAME)    fun messageChannel(): MessageChannel }<\/code><\/pre>\n<p><em>Consumer.kt<\/em>:<\/p>\n<pre><code class=\"kotlin\">@EnableBinding(ConsumerBinding::class) class Consumer(val messageService: MessageService) {     @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)    fun process(        @Payload message: Map&lt;String, Any?&gt;,        @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?    ) {        messageService.consume(message)    } }<\/code><\/pre>\n<p>\u041c\u044b \u0441\u043e\u0437\u0434\u0430\u043b\u0438 \u0431\u0440\u043e\u043a\u0435\u0440 Kafka \u0441 \u0442\u043e\u043f\u0438\u043a\u043e\u043c. \u0414\u043b\u044f \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f \u0431\u0443\u0434\u0435\u043c \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u0432\u0441\u0442\u0440\u043e\u0435\u043d\u043d\u0443\u044e Kafka, \u0434\u043e\u0441\u0442\u0443\u043f\u043d\u0443\u044e \u043d\u0430\u043c \u0441 \u0437\u0430\u0432\u0438\u0441\u0438\u043c\u043e\u0441\u0442\u044c\u044e spring-kafka-test.<\/p>\n<h2>\u0424\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0441 MessageCollector<\/h2>\n<p>\u041c\u044b \u0438\u043c\u0435\u0435\u043c \u0434\u0435\u043b\u043e \u0441 \u0438\u043c\u043f\u043b\u0435\u043c\u0435\u043d\u0442\u0430\u0446\u0438\u0435\u0439 \u0431\u0438\u043d\u0434\u0435\u0440\u0430, \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u044e\u0449\u0435\u0439 \u0432\u0437\u0430\u0438\u043c\u043e\u0434\u0435\u0439\u0441\u0442\u0432\u043e\u0432\u0430\u0442\u044c \u0441 \u043a\u0430\u043d\u0430\u043b\u0430\u043c\u0438 \u0438 \u043f\u043e\u043b\u0443\u0447\u0430\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f. \u041e\u0442\u043f\u0440\u0430\u0432\u0438\u043c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0435 \u0432 \u043a\u0430\u043d\u0430\u043b ProducerBinding \u0438 \u0437\u0430\u0442\u0435\u043c \u043f\u043e\u043b\u0443\u0447\u0438\u043c \u0435\u0433\u043e \u0432 \u0432\u0438\u0434\u0435 payload&nbsp;<em>ProducerTest.kt<\/em>:<\/p>\n<pre><code class=\"kotlin\">@SpringBootTest class ProducerTest {     @Autowired    lateinit var producerBinding: ProducerBinding     @Autowired    lateinit var messageCollector: MessageCollector     @Test    fun `should produce somePayload to channel`() {        \/\/ ARRANGE        val request = mapOf(1 to \"foo\", 2 to \"bar\", \"three\" to 10101)         \/\/ ACT producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())        val payload = messageCollector.forChannel(producerBinding.messageChannel())            .poll()            .payload         \/\/ ASSERT        val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)        assertTrue(request.entries.stream().allMatch { re -&gt;            re.value == payloadAsMap[re.key.toString()]        })         messageCollector.forChannel(producerBinding.messageChannel()).clear()    } }<\/code><\/pre>\n<h2>\u0422\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0441 \u0431\u0440\u043e\u043a\u0435\u0440\u043e\u043c Embedded Kafka<\/h2>\n<p>\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c \u0430\u043d\u043d\u043e\u0442\u0430\u0446\u0438\u044e @ClassRule \u0434\u043b\u044f \u0441\u043e\u0437\u0434\u0430\u043d\u0438\u044f \u0431\u0440\u043e\u043a\u0435\u0440\u0430. \u0422\u0430\u043a \u043c\u044b \u0441\u043c\u043e\u0436\u0435\u043c \u043f\u043e\u0434\u043d\u044f\u0442\u044c \u0441\u0435\u0440\u0432\u0435\u0440\u0430 Kafka \u0438 Zookeeper \u043d\u0430 \u0441\u043b\u0443\u0447\u0430\u0439\u043d\u043e\u043c \u043f\u043e\u0440\u0442\u0435 \u043f\u0435\u0440\u0435\u0434 \u043d\u0430\u0447\u0430\u043b\u043e\u043c \u0442\u0435\u0441\u0442\u0430 \u0438 \u0432\u044b\u043a\u043b\u044e\u0447\u0438\u0442\u044c \u0438\u0445, \u043a\u043e\u0433\u0434\u0430 \u0442\u0435\u0441\u0442 \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u0442\u0441\u044f. \u042d\u0442\u043e \u0438\u0437\u0431\u0430\u0432\u043b\u044f\u0435\u0442 \u043d\u0430\u0441 \u043e\u0442 \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e\u0441\u0442\u0438 \u0432 \u0440\u0430\u0431\u043e\u0447\u0435\u043c \u0438\u043d\u0441\u0442\u0430\u043d\u0441\u0435 Kafka \u0438 Zookeper \u043d\u0430 \u0432\u0441\u0451 \u0432\u0440\u0435\u043c\u044f \u043f\u0440\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u044f \u0442\u0435\u0441\u0442\u0430 (<em>ConsumerTest.kt<\/em>):<\/p>\n<pre><code class=\"kotlin\">@SpringBootTest @ActiveProfiles(\"test\") @EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class]) @EnableBinding(ProducerBinding::class) class ConsumerTest {     @Autowired    lateinit var producerBinding: ProducerBinding     @Autowired    lateinit var objectMapper: ObjectMapper     @MockBean    lateinit var messageService: MessageService     companion object {        @ClassRule @JvmField        var embeddedKafka = EmbeddedKafkaRule(1, true, \"any-name-of-topic\")    }     @Test    fun `should consume via txConsumer process`() {        \/\/ ACT        val request = mapOf(1 to \"foo\", 2 to \"bar\")        producerBinding.messageChannel().send(MessageBuilder.withPayload(request)            .setHeader(\"someHeaderName\", \"someHeaderValue\")            .build())         \/\/ ASSERT        val requestAsMap = objectMapper.readValue&lt;Map&lt;String, Any?&gt;&gt;(objectMapper.writeValueAsString(request))        runBlocking {            delay(20)            verify(messageService).consume(requestAsMap)        }    } }<\/code><\/pre>\n<h2>\u0417\u0430\u043a\u043b\u044e\u0447\u0435\u043d\u0438\u0435<\/h2>\n<p>\u0412 \u044d\u0442\u043e\u043c \u043f\u043e\u0441\u0442\u0435 \u044f \u043f\u0440\u043e\u0434\u0435\u043c\u043e\u043d\u0441\u0442\u0440\u0438\u0440\u043e\u0432\u0430\u043b \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u0438 Spring Cloud Stream \u0438 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u044f \u0435\u0433\u043e \u0441 Kafka. Spring Cloud Stream \u043f\u0440\u0435\u0434\u043b\u0430\u0433\u0430\u0435\u0442 \u0443\u0434\u043e\u0431\u043d\u044b\u0439 \u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441 \u0441 \u0443\u043f\u0440\u043e\u0449\u0435\u043d\u043d\u044b\u043c\u0438 \u043d\u044e\u0430\u043d\u0441\u0430\u043c\u0438 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0431\u0440\u043e\u043a\u0435\u0440\u0430, \u0431\u044b\u0441\u0442\u0440\u043e \u0432\u043d\u0435\u0434\u0440\u044f\u0435\u0442\u0441\u044f, \u0441\u0442\u0430\u0431\u0438\u043b\u044c\u043d\u043e \u0440\u0430\u0431\u043e\u0442\u0430\u0435\u0442 \u0438 \u043f\u043e\u0434\u0434\u0435\u0440\u0436\u0438\u0432\u0430\u0435\u0442 \u0441\u043e\u0432\u0440\u0435\u043c\u0435\u043d\u043d\u044b\u0435 \u043f\u043e\u043f\u0443\u043b\u044f\u0440\u043d\u044b\u0435 \u0431\u0440\u043e\u043a\u0435\u0440\u044b, \u0442\u0430\u043a\u0438\u0435 \u043a\u0430\u043a Kafka. \u041f\u043e \u0438\u0442\u043e\u0433\u0430\u043c \u044f \u043f\u0440\u0438\u0432\u0435\u043b \u0440\u044f\u0434 \u043f\u0440\u0438\u043c\u0435\u0440\u043e\u0432 \u0441 unit-\u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435\u043c \u043d\u0430 \u043e\u0441\u043d\u043e\u0432\u0435 EmbeddedKafkaRule \u0441 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435\u043c MessageCollector.<\/p>\n<p>\u0412\u0441\u0435 \u0438\u0441\u0445\u043e\u0434\u043d\u0438\u043a\u0438 \u043c\u043e\u0436\u043d\u043e \u043d\u0430\u0439\u0442\u0438 \u043d\u0430 <a href=\"https:\/\/github.com\/Vito89\/cloud-stream-binding-kafka-app\" rel=\"noopener noreferrer nofollow\"><u>Github<\/u><\/a>. \u0421\u043f\u0430\u0441\u0438\u0431\u043e \u0437\u0430 \u043f\u0440\u043e\u0447\u0442\u0435\u043d\u0438\u0435!<\/p>\n<\/div>\n<p> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/post\/552448\/\"> https:\/\/habr.com\/ru\/post\/552448\/<\/a><br \/><\/br><\/br><\/br><\/br><\/p>\n<\/div>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[],"class_list":["post-321500","post","type-post","status-publish","format-standard","hentry"],"_links":{"self":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/321500","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=321500"}],"version-history":[{"count":0,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/321500\/revisions"}],"wp:attachment":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=321500"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=321500"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=321500"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}