{"id":316111,"date":"2021-01-09T15:00:12","date_gmt":"2021-01-09T15:00:12","guid":{"rendered":"http:\/\/savepearlharbor.com\/?p=316111"},"modified":"-0001-11-30T00:00:00","modified_gmt":"-0001-11-29T21:00:00","slug":"","status":"publish","type":"post","link":"https:\/\/savepearlharbor.com\/?p=316111","title":{"rendered":"\u0421\u0435\u0440\u0432\u0438\u0441\u044b \u0441 Apache Kafka \u0438 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435"},"content":{"rendered":"\n<div class=\"post__text post__text-html post__text_v1\" id=\"post-content-body\">\n<p>\u041a\u043e\u0433\u0434\u0430 \u0441\u0435\u0440\u0432\u0438\u0441\u044b \u0438\u043d\u0442\u0435\u0433\u0440\u0438\u0440\u0443\u044e\u0442\u0441\u044f \u043f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 <a href=\"https:\/\/www.confluent.io\/blog\/building-a-microservices-ecosystem-with-kafka-streams-and-ksql\/\" rel=\"nofollow\">Kafka<\/a> \u043e\u0447\u0435\u043d\u044c \u0443\u0434\u043e\u0431\u043d\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c REST API, \u043a\u0430\u043a \u0443\u043d\u0438\u0432\u0435\u0440\u0441\u0430\u043b\u044c\u043d\u044b\u0439 \u0438 \u0441\u0442\u0430\u043d\u0434\u0430\u0440\u0442\u043d\u044b\u0439 \u0441\u043f\u043e\u0441\u043e\u0431 \u043e\u0431\u043c\u0435\u043d\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f\u043c\u0438. \u041f\u0440\u0438 \u0443\u0432\u0435\u043b\u0438\u0447\u0435\u043d\u0438\u0438 \u043a\u043e\u043b\u0438\u0447\u0435\u0441\u0442\u0432\u0430 \u0441\u0435\u0440\u0432\u0438\u0441\u043e\u0432 \u0441\u043b\u043e\u0436\u043d\u043e\u0441\u0442\u044c \u043a\u043e\u043c\u043c\u0443\u043d\u0438\u043a\u0430\u0446\u0438\u0439 \u0443\u0432\u0435\u043b\u0438\u0447\u0438\u0432\u0430\u0435\u0442\u0441\u044f. \u0414\u043b\u044f \u043a\u043e\u043d\u0442\u0440\u043e\u043b\u044f \u043c\u043e\u0436\u043d\u043e \u0438 \u043d\u0443\u0436\u043d\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u0438\u043d\u0442\u0435\u0433\u0440\u0430\u0446\u0438\u043e\u043d\u043d\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435. \u0422\u0430\u043a\u0438\u0435 \u0431\u0438\u0431\u043b\u0438\u043e\u0442\u0435\u043a\u0438 \u043a\u0430\u043a <a href=\"https:\/\/www.testcontainers.org\/\" rel=\"nofollow\">testcontainers<\/a> \u0438\u043b\u0438 EmbeddedServer \u043f\u0440\u0435\u043a\u0440\u0430\u0441\u043d\u043e \u043f\u043e\u043c\u043e\u0433\u0430\u044e\u0442 \u043e\u0440\u0433\u0430\u043d\u0438\u0437\u043e\u0432\u0430\u0442\u044c \u0442\u0430\u043a\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435. \u0421\u0443\u0449\u0435\u0441\u0442\u0432\u0443\u044e\u0442 \u043c\u043d\u043e\u0433\u043e \u043f\u0440\u0438\u043c\u0435\u0440\u043e\u0432 \u0434\u043b\u044f <a href=\"https:\/\/micronaut.io\/\" rel=\"nofollow\">micronaut<\/a>, <a href=\"https:\/\/spring.io\/projects\/spring-boot\" rel=\"nofollow\">Spring Boot<\/a> \u0438 \u0442.\u0434. \u041d\u043e \u0432 \u044d\u0442\u0438\u0445 \u043f\u0440\u0438\u043c\u0435\u0440\u0430\u0445 \u043e\u043f\u0443\u0449\u0435\u043d\u044b \u043d\u0435\u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0434\u0435\u0442\u0430\u043b\u0438, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u043d\u0435 \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u044e\u0442 \u0441 \u043f\u0435\u0440\u0432\u043e\u0433\u043e \u0440\u0430\u0437\u0430 \u0437\u0430\u043f\u0443\u0441\u0442\u0438\u0442\u044c \u043a\u043e\u0434. \u0412 \u0441\u0442\u0430\u0442\u044c\u0435 \u043f\u0440\u0438\u0432\u043e\u0434\u044f\u0442\u0441\u044f \u043f\u0440\u0438\u043c\u0435\u0440\u044b \u0441 \u043f\u043e\u0434\u0440\u043e\u0431\u043d\u044b\u043c \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435\u043c \u0438 \u0441\u0441\u044b\u043b\u043a\u0430\u043c\u0438 \u043d\u0430 \u043a\u043e\u0434.<\/p>\n<p><a name=\"habracut\"><\/a>  <\/p>\n<h1 id=\"primer\">\u041f\u0440\u0438\u043c\u0435\u0440<\/h1>\n<p>  <\/p>\n<p>\u0414\u043b\u044f \u043f\u0440\u043e\u0441\u0442\u043e\u0442\u044b \u043c\u043e\u0436\u043d\u043e \u043f\u0440\u0438\u043d\u044f\u0442\u044c \u0442\u0430\u043a\u043e\u0439 REST API.<\/p>\n<p>  <\/p>\n<p>\/runs \u2014 POST-\u043c\u0435\u0442\u043e\u0434. \u0418\u043d\u0438\u0446\u0438\u0430\u043b\u0438\u0437\u0438\u0440\u0443\u0435\u0442 \u0437\u0430\u043f\u0440\u043e\u0441 \u0432 \u043a\u0430\u043d\u0430\u043b \u0441\u0432\u044f\u0437\u0438. \u041f\u0440\u0438\u043d\u0438\u043c\u0430\u0435\u0442 \u0434\u0430\u043d\u043d\u044b\u0435 \u0438 \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u043a\u043b\u044e\u0447 \u0437\u0430\u043f\u0440\u043e\u0441\u0430.<br \/>  \/runs\/{key}\/status \u2013 GET-\u043c\u0435\u0442\u043e\u0434. \u041f\u043e \u043a\u043b\u044e\u0447\u0443 \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u0441\u0442\u0430\u0442\u0443\u0441 \u0437\u0430\u043f\u0440\u043e\u0441\u0430. \u041c\u043e\u0436\u0435\u0442 \u043f\u0440\u0438\u043d\u0438\u043c\u0430\u0442\u044c \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u0435 \u0437\u043d\u0430\u0447\u0435\u043d\u0438\u044f: UNKNOWN, RUNNING, DONE.<br \/>  \/runs \/{key} \u2013 GET-\u043c\u0435\u0442\u043e\u0434. \u041f\u043e \u043a\u043b\u044e\u0447\u0443 \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442 \u0437\u0430\u043f\u0440\u043e\u0441\u0430.<\/p>\n<p>  <\/p>\n<p>\u041f\u043e\u0434\u043e\u0431\u043d\u044b\u0439 API \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u043d \u0443 <a href=\"https:\/\/livy.apache.org\/docs\/latest\/rest-api.html\" rel=\"nofollow\">livy<\/a>, \u0445\u043e\u0442\u044f \u0438 \u0434\u043b\u044f \u0434\u0440\u0443\u0433\u0438\u0445 \u0437\u0430\u0434\u0430\u0447.<\/p>\n<p>  <\/p>\n<h1 id=\"realizaciya\">\u0420\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f<\/h1>\n<p>  <\/p>\n<p>\u0411\u0443\u0434\u0443\u0442 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f: micronaut, Spring Boot.<\/p>\n<p>  <\/p>\n<h2 id=\"micronaut\">micronaut<\/h2>\n<p>  <\/p>\n<p>\u041a\u043e\u043d\u0442\u0440\u043e\u043b\u043b\u0435\u0440 \u0434\u043b\u044f API.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Get; import io.micronaut.http.annotation.Post; import io.reactivex.Maybe; import io.reactivex.schedulers.Schedulers;  import javax.inject.Inject; import java.util.UUID;  @Controller(&quot;\/runs&quot;) public class RunController {     @Inject     RunClient runClient;      @Inject     RunCache runCache;      @Post     public String runs(@Body String body) {         String key = UUID.randomUUID().toString();         runCache.statuses.put(key, RunStatus.RUNNING);         runCache.responses.put(key, &quot;&quot;);         runClient.sendRun(key, new Run(key, RunType.REQUEST, &quot;&quot;, body));         return key;     }      @Get(&quot;\/{key}\/status&quot;)     public Maybe&lt;RunStatus&gt; getRunStatus(String key) {         return Maybe.just(key)                 .subscribeOn(Schedulers.io())                 .map(it -&gt; runCache.statuses.getOrDefault(it, RunStatus.UNKNOWN));     }      @Get(&quot;\/{key}&quot;)     public Maybe&lt;String&gt; getRunResponse(String key) {         return Maybe.just(key)                 .subscribeOn(Schedulers.io())                 .map(it -&gt; runCache.responses.getOrDefault(it, &quot;&quot;));     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0442\u043f\u0440\u0430\u0432\u043a\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0432 kafka.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import io.micronaut.configuration.kafka.annotation.*; import io.micronaut.messaging.annotation.Body;  @KafkaClient public interface RunClient {     @Topic(&quot;runs&quot;)     void sendRun(@KafkaKey String key, @Body Run run); }<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0435 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0438\u0437 kafka.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import io.micronaut.configuration.kafka.annotation.*; import io.micronaut.messaging.annotation.Body;  import javax.inject.Inject;  @KafkaListener(offsetReset = OffsetReset.EARLIEST) public class RunListener {     @Inject     RunCalculator runCalculator;      @Topic(&quot;runs&quot;)     public void receive(@KafkaKey String key, @Body Run run) {         runCalculator.run(key, run);     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u043f\u0440\u043e\u0438\u0441\u0445\u043e\u0434\u0438\u0442 \u0432 RunCalculator. \u0414\u043b\u044f \u0442\u0435\u0441\u0442\u043e\u0432 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442\u0441\u044f \u043e\u0441\u043e\u0431\u0430\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f, \u0432 \u043a\u043e\u0442\u043e\u0440\u043e\u0439 \u043f\u0440\u043e\u0438\u0441\u0445\u043e\u0434\u0438\u0442 \u043f\u0435\u0440\u0435\u0431\u0440\u043e\u0441\u043a\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import io.micronaut.context.annotation.Replaces;  import javax.inject.Inject; import javax.inject.Singleton; import java.util.UUID;  @Replaces(RunCalculatorImpl.class) @Singleton public class RunCalculatorWithWork implements RunCalculator {     @Inject     RunClient runClient;      @Inject     RunCache runCache;      @Override     public void run(String key, Run run) {         if (RunType.REQUEST.equals(run.getType())) {             String runKey = run.getKey();             String newKey = UUID.randomUUID().toString();             String runBody = run.getBody();             runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + &quot;_calculated&quot;));         } else if (RunType.RESPONSE.equals(run.getType())) {             runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE);             runCache.responses.replace(run.getResponseKey(), run.getBody());         }     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0422\u0435\u0441\u0442.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import io.micronaut.http.HttpRequest; import io.micronaut.http.client.HttpClient;  import static org.junit.jupiter.api.Assertions.assertEquals;  public abstract class RunBase {     void run(HttpClient client) {         String key = client.toBlocking().retrieve(HttpRequest.POST(&quot;\/runs&quot;, &quot;body&quot;));         RunStatus runStatus = RunStatus.UNKNOWN;         while (runStatus != RunStatus.DONE) {             runStatus = client.toBlocking().retrieve(HttpRequest.GET(&quot;\/runs\/&quot; + key + &quot;\/status&quot;), RunStatus.class);             try {                 Thread.sleep(500);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         String response = client.toBlocking().retrieve(HttpRequest.GET(&quot;\/runs\/&quot; + key), String.class);         assertEquals(&quot;body_calculated&quot;, response);     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0414\u043b\u044f \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u044f EmbeddedServer \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e.<\/p>\n<p>  <\/p>\n<p>\u041f\u043e\u0434\u043a\u043b\u044e\u0447\u0438\u0442\u044c \u0431\u0438\u0431\u043b\u0438\u043e\u0442\u0435\u043a\u0438:<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">testImplementation(&quot;org.apache.kafka:kafka-clients:2.6.0:test&quot;) testImplementation(&quot;org.apache.kafka:kafka_2.12:2.6.0&quot;) testImplementation(&quot;org.apache.kafka:kafka_2.12:2.6.0:test&quot;)<\/code><\/pre>\n<p>  <\/p>\n<p>\u0422\u0435\u0441\u0442 \u043c\u043e\u0436\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0442\u0430\u043a.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import io.micronaut.context.ApplicationContext; import io.micronaut.http.client.HttpClient; import io.micronaut.runtime.server.EmbeddedServer; import org.junit.jupiter.api.Test;  import java.util.HashMap; import java.util.Map;  public class RunKeTest extends RunBase {     @Test     void test() {         Map&lt;String, Object&gt; properties = new HashMap&lt;&gt;();         properties.put(&quot;kafka.bootstrap.servers&quot;, &quot;localhost:9092&quot;);         properties.put(&quot;kafka.embedded.enabled&quot;, &quot;true&quot;);         try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) {             ApplicationContext applicationContext = embeddedServer.getApplicationContext();             HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI());              run(client);         }     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0414\u043b\u044f \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u044f <a href=\"https:\/\/www.testcontainers.org\/\" rel=\"nofollow\">testcontainers<\/a> \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e.<\/p>\n<p>  <\/p>\n<p>\u041f\u043e\u0434\u043a\u043b\u044e\u0447\u0438\u0442\u044c \u0431\u0438\u0431\u043b\u0438\u043e\u0442\u0435\u043a\u0438:<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">implementation(&quot;org.testcontainers:kafka:1.14.3&quot;)<\/code><\/pre>\n<p>  <\/p>\n<p>\u0422\u0435\u0441\u0442 \u043c\u043e\u0436\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0442\u0430\u043a.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import io.micronaut.context.ApplicationContext; import io.micronaut.http.client.HttpClient; import io.micronaut.runtime.server.EmbeddedServer;  import org.junit.jupiter.api.Test;  import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName;  import java.util.HashMap; import java.util.Map;  public class RunTcTest extends RunBase {      @Test     public void test() {         try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(&quot;confluentinc\/cp-kafka:5.5.3&quot;))) {             kafka.start();             Map&lt;String, Object&gt; properties = new HashMap&lt;&gt;();             properties.put(&quot;kafka.bootstrap.servers&quot;, kafka.getBootstrapServers());             try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) {                 ApplicationContext applicationContext = embeddedServer.getApplicationContext();                 HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI());                  run(client);             }         }     } }<\/code><\/pre>\n<p>  <\/p>\n<h2 id=\"spring-boot\">Spring Boot<\/h2>\n<p>  <\/p>\n<p>\u041a\u043e\u043d\u0442\u0440\u043e\u043b\u043b\u0435\u0440 \u0434\u043b\u044f API.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;  import java.util.UUID;  @RestController @RequestMapping(&quot;\/runs&quot;) public class RunController {     @Autowired     private RunClient runClient;      @Autowired     private RunCache runCache;      @PostMapping()     public String runs(@RequestBody String body) {         String key = UUID.randomUUID().toString();         runCache.statuses.put(key, RunStatus.RUNNING);         runCache.responses.put(key, &quot;&quot;);         runClient.sendRun(key, new Run(key, RunType.REQUEST, &quot;&quot;, body));         return key;     }      @GetMapping(&quot;\/{key}\/status&quot;)     public RunStatus getRunStatus(@PathVariable String key) {         return runCache.statuses.getOrDefault(key, RunStatus.UNKNOWN);     }      @GetMapping(&quot;\/{key}&quot;)     public String getRunResponse(@PathVariable String key) {         return runCache.responses.getOrDefault(key, &quot;&quot;);     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0442\u043f\u0440\u0430\u0432\u043a\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0432 kafka.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component;  @Component public class RunClient {     @Autowired     private KafkaTemplate&lt;String, String&gt; kafkaTemplate;      @Autowired     private ObjectMapper objectMapper;      public void sendRun(String key, Run run) {         String data = &quot;&quot;;         try {             data = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(run);         } catch (JsonProcessingException e) {             e.printStackTrace();         }         kafkaTemplate.send(&quot;runs&quot;, key, data);     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0435 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u0438\u0437 kafka.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;  @Component public class RunListener {     @Autowired     private ObjectMapper objectMapper;      @Autowired     private RunCalculator runCalculator;      @KafkaListener(topics = &quot;runs&quot;, groupId = &quot;m-group&quot;)     public void receive(ConsumerRecord&lt;?, ?&gt; consumerRecord) {         String key = consumerRecord.key().toString();         Run run = null;         try {             run = objectMapper.readValue(consumerRecord.value().toString(), Run.class);         } catch (JsonProcessingException e) {             e.printStackTrace();         }         runCalculator.run(key, run);     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u043f\u0440\u043e\u0438\u0441\u0445\u043e\u0434\u0438\u0442 \u0432 RunCalculator. \u0414\u043b\u044f \u0442\u0435\u0441\u0442\u043e\u0432 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442\u0441\u044f \u043e\u0441\u043e\u0431\u0430\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f, \u0432 \u043a\u043e\u0442\u043e\u0440\u043e\u0439 \u043f\u0440\u043e\u0438\u0441\u0445\u043e\u0434\u0438\u0442 \u043f\u0435\u0440\u0435\u0431\u0440\u043e\u0441\u043a\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;  import java.util.UUID;  @Component public class RunCalculatorWithWork implements RunCalculator {     @Autowired     RunClient runClient;      @Autowired     RunCache runCache;      @Override     public void run(String key, Run run) {         if (RunType.REQUEST.equals(run.getType())) {             String runKey = run.getKey();             String newKey = UUID.randomUUID().toString();             String runBody = run.getBody();             runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + &quot;_calculated&quot;));         } else if (RunType.RESPONSE.equals(run.getType())) {             runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE);             runCache.responses.replace(run.getResponseKey(), run.getBody());         }     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0422\u0435\u0441\u0442.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.http.MediaType; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;  import static org.junit.jupiter.api.Assertions.assertEquals; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;  public abstract class RunBase {     void run(MockMvc mockMvc, ObjectMapper objectMapper) throws Exception {         MvcResult keyResult = mockMvc.perform(MockMvcRequestBuilders.post(&quot;\/runs&quot;)                 .content(&quot;body&quot;)                 .contentType(MediaType.APPLICATION_JSON)                 .accept(MediaType.APPLICATION_JSON))                 .andExpect(status().isOk())                 .andReturn();          String key = keyResult.getResponse().getContentAsString();         RunStatus runStatus = RunStatus.UNKNOWN;         while (runStatus != RunStatus.DONE) {             MvcResult statusResult = mockMvc.perform(MockMvcRequestBuilders.get(&quot;\/runs\/&quot; + key + &quot;\/status&quot;)                     .contentType(MediaType.APPLICATION_JSON)                     .accept(MediaType.APPLICATION_JSON))                     .andExpect(status().isOk())                     .andReturn();             runStatus = objectMapper.readValue(statusResult.getResponse().getContentAsString(), RunStatus.class);             try {                 Thread.sleep(500);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         String response = mockMvc.perform(MockMvcRequestBuilders.get(&quot;\/runs\/&quot; + key)                 .contentType(MediaType.APPLICATION_JSON)                 .accept(MediaType.APPLICATION_JSON))                 .andExpect(status().isOk())                 .andReturn().getResponse().getContentAsString();         assertEquals(&quot;body_calculated&quot;, response);     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0414\u043b\u044f \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u044f EmbeddedServer \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e.<\/p>\n<p>  <\/p>\n<p>\u041f\u043e\u0434\u043a\u043b\u044e\u0447\u0438\u0442\u044c \u0431\u0438\u0431\u043b\u0438\u043e\u0442\u0435\u043a\u0438:<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">&lt;dependency&gt;     &lt;groupId&gt;org.springframework.kafka&lt;\/groupId&gt;     &lt;artifactId&gt;spring-kafka&lt;\/artifactId&gt;     &lt;version&gt;2.5.10.RELEASE&lt;\/version&gt; &lt;\/dependency&gt;  &lt;dependency&gt;     &lt;groupId&gt;org.springframework.kafka&lt;\/groupId&gt;     &lt;artifactId&gt;spring-kafka-test&lt;\/artifactId&gt;     &lt;version&gt;2.5.10.RELEASE&lt;\/version&gt;     &lt;scope&gt;test&lt;\/scope&gt; &lt;\/dependency&gt;<\/code><\/pre>\n<p>  <\/p>\n<p>\u0422\u0435\u0441\u0442 \u043c\u043e\u0436\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0442\u0430\u043a.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc;  @AutoConfigureMockMvc @SpringBootTest @EmbeddedKafka(partitions = 1, brokerProperties = {&quot;listeners=PLAINTEXT:\/\/localhost:9092&quot;, &quot;port=9092&quot;}) @Import(RunKeTest.RunKeTestConfiguration.class) public class RunKeTest extends RunBase {     @Autowired     private MockMvc mockMvc;      @Autowired     private ObjectMapper objectMapper;      @Test     void test() throws Exception {         run(mockMvc, objectMapper);     }      @TestConfiguration     static class RunKeTestConfiguration {         @Autowired         private RunCache runCache;          @Autowired         private RunClient runClient;          @Bean         public RunCalculator runCalculator() {             RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork();             runCalculatorWithWork.runCache = runCache;             runCalculatorWithWork.runClient = runClient;             return runCalculatorWithWork;         }     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0414\u043b\u044f \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u044f <a href=\"https:\/\/www.testcontainers.org\/\" rel=\"nofollow\">testcontainers<\/a> \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e.<\/p>\n<p>  <\/p>\n<p>\u041f\u043e\u0434\u043a\u043b\u044e\u0447\u0438\u0442\u044c \u0431\u0438\u0431\u043b\u0438\u043e\u0442\u0435\u043a\u0438:<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">&lt;dependency&gt;     &lt;groupId&gt;org.testcontainers&lt;\/groupId&gt;     &lt;artifactId&gt;kafka&lt;\/artifactId&gt;     &lt;version&gt;1.14.3&lt;\/version&gt;     &lt;scope&gt;test&lt;\/scope&gt; &lt;\/dependency&gt;<\/code><\/pre>\n<p>  <\/p>\n<p>\u0422\u0435\u0441\u0442 \u043c\u043e\u0436\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0442\u0430\u043a.<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.ClassRule; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.test.web.servlet.MockMvc; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName;  import java.util.HashMap; import java.util.Map;  @AutoConfigureMockMvc @SpringBootTest @Import(RunTcTest.RunTcTestConfiguration.class) public class RunTcTest extends RunBase {     @ClassRule     public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse(&quot;confluentinc\/cp-kafka:5.5.3&quot;));      static {         kafka.start();     }      @Autowired     private MockMvc mockMvc;      @Autowired     private ObjectMapper objectMapper;      @Test     void test() throws Exception {         run(mockMvc, objectMapper);     }      @TestConfiguration     static class RunTcTestConfiguration {         @Autowired         private RunCache runCache;          @Autowired         private RunClient runClient;          @Bean         ConcurrentKafkaListenerContainerFactory&lt;Integer, String&gt; kafkaListenerContainerFactory() {             ConcurrentKafkaListenerContainerFactory&lt;Integer, String&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();             factory.setConsumerFactory(consumerFactory());             return factory;         }          @Bean         public ConsumerFactory&lt;Integer, String&gt; consumerFactory() {             return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs());         }          @Bean         public Map&lt;String, Object&gt; consumerConfigs() {             Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());             props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);             props.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;m-group&quot;);             props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);             props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);             return props;         }          @Bean         public ProducerFactory&lt;String, String&gt; producerFactory() {             Map&lt;String, Object&gt; configProps = new HashMap&lt;&gt;();             configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());             configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);             configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);             return new DefaultKafkaProducerFactory&lt;&gt;(configProps);         }          @Bean         public KafkaTemplate&lt;String, String&gt; kafkaTemplate() {             return new KafkaTemplate&lt;&gt;(producerFactory());         }          @Bean         public RunCalculator runCalculator() {             RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork();             runCalculatorWithWork.runCache = runCache;             runCalculatorWithWork.runClient = runClient;             return runCalculatorWithWork;         }     } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u0435\u0440\u0435\u0434 \u0432\u0441\u0435\u043c\u0438 \u0442\u0435\u0441\u0442\u0430\u043c\u0438 \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0441\u0442\u0430\u0440\u0442\u043e\u0432\u0430\u0442\u044c kafka. \u042d\u0442\u043e \u0434\u0435\u043b\u0430\u0435\u0442\u0441\u044f \u0432\u043e\u0442 \u0442\u0430\u043a\u0438\u043c \u0432\u043e\u0442 \u043e\u0431\u0440\u0430\u0437\u043e\u043c: <\/p>\n<p>  <\/p>\n<pre><code class=\"java\">kafka.start();<\/code><\/pre>\n<p>  <\/p>\n<p>\u0414\u043e\u043f\u043e\u043b\u043d\u0438\u0442\u0435\u043b\u044c\u043d\u044b\u0435 \u0441\u0432\u043e\u0439\u0441\u0442\u0432\u0430 \u0434\u043b\u044f kafka \u0432 \u0442\u0435\u0441\u0442\u0430\u0445 \u043c\u043e\u0436\u043d\u043e \u0437\u0430\u0434\u0430\u0442\u044c \u0432 \u0440\u0435\u0441\u0443\u0440\u0441\u043d\u043e\u043c \u0444\u0430\u0439\u043b\u0435.<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">application.yml<\/code><\/pre>\n<p>  <\/p>\n<pre><code class=\"plaintext\">spring:   kafka:     consumer:       auto-offset-reset: earliest<\/code><\/pre>\n<p>  <\/p>\n<h1 id=\"resursy-i-ssylki\">\u0420\u0435\u0441\u0443\u0440\u0441\u044b \u0438 \u0441\u0441\u044b\u043b\u043a\u0438<\/h1>\n<p>  <\/p>\n<p><a href=\"https:\/\/github.com\/kartzum\/s-space\/tree\/main\/r-streams\" rel=\"nofollow\">\u041a\u043e\u0434 \u0434\u043b\u044f micronaut<\/a><\/p>\n<p>  <\/p>\n<p><a href=\"https:\/\/github.com\/kartzum\/s-space\/tree\/main\/s-streams\" rel=\"nofollow\">\u041a\u043e\u0434 \u0434\u043b\u044f Spring Boot<\/a><\/p>\n<p>  <\/p>\n<p><a href=\"https:\/\/piotrminkowski.wordpress.com\/2019\/10\/09\/part-1-testing-kafka-microservices-with-micronaut\/\" rel=\"nofollow\">PART 1: TESTING KAFKA MICROSERVICES WITH MICRONAUT<\/a><\/p>\n<p>  <\/p>\n<p><a href=\"https:\/\/www.baeldung.com\/spring-boot-kafka-testing\" rel=\"nofollow\">Testing Kafka and Spring Boot<\/a><\/p>\n<p>  <\/p>\n<p><a href=\"https:\/\/micronaut-projects.github.io\/micronaut-kafka\/latest\/guide\/index.html\" rel=\"nofollow\">Micronaut Kafka<\/a><\/p>\n<p>  <\/p>\n<p><a href=\"https:\/\/spring.io\/projects\/spring-kafka\" rel=\"nofollow\">Spring for Apache Kafka<\/a><\/p>\n<\/div>\n<p> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/post\/536578\/\"> https:\/\/habr.com\/ru\/post\/536578\/<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"\n<div class=\"post__text post__text-html post__text_v1\" id=\"post-content-body\">\n<p>\u041a\u043e\u0433\u0434\u0430 \u0441\u0435\u0440\u0432\u0438\u0441\u044b \u0438\u043d\u0442\u0435\u0433\u0440\u0438\u0440\u0443\u044e\u0442\u0441\u044f \u043f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 <a href=\"https:\/\/www.confluent.io\/blog\/building-a-microservices-ecosystem-with-kafka-streams-and-ksql\/\" rel=\"nofollow\">Kafka<\/a> \u043e\u0447\u0435\u043d\u044c \u0443\u0434\u043e\u0431\u043d\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c REST API, \u043a\u0430\u043a \u0443\u043d\u0438\u0432\u0435\u0440\u0441\u0430\u043b\u044c\u043d\u044b\u0439 \u0438 \u0441\u0442\u0430\u043d\u0434\u0430\u0440\u0442\u043d\u044b\u0439 \u0441\u043f\u043e\u0441\u043e\u0431 \u043e\u0431\u043c\u0435\u043d\u0430 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f\u043c\u0438. \u041f\u0440\u0438 \u0443\u0432\u0435\u043b\u0438\u0447\u0435\u043d\u0438\u0438 \u043a\u043e\u043b\u0438\u0447\u0435\u0441\u0442\u0432\u0430 \u0441\u0435\u0440\u0432\u0438\u0441\u043e\u0432 \u0441\u043b\u043e\u0436\u043d\u043e\u0441\u0442\u044c \u043a\u043e\u043c\u043c\u0443\u043d\u0438\u043a\u0430\u0446\u0438\u0439 \u0443\u0432\u0435\u043b\u0438\u0447\u0438\u0432\u0430\u0435\u0442\u0441\u044f. \u0414\u043b\u044f \u043a\u043e\u043d\u0442\u0440\u043e\u043b\u044f \u043c\u043e\u0436\u043d\u043e \u0438 \u043d\u0443\u0436\u043d\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u0438\u043d\u0442\u0435\u0433\u0440\u0430\u0446\u0438\u043e\u043d\u043d\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435. \u0422\u0430\u043a\u0438\u0435 \u0431\u0438\u0431\u043b\u0438\u043e\u0442\u0435\u043a\u0438 \u043a\u0430\u043a <a href=\"https:\/\/www.testcontainers.org\/\" rel=\"nofollow\">testcontainers<\/a> \u0438\u043b\u0438 EmbeddedServer \u043f\u0440\u0435\u043a\u0440\u0430\u0441\u043d\u043e \u043f\u043e\u043c\u043e\u0433\u0430\u044e\u0442 \u043e\u0440\u0433\u0430\u043d\u0438\u0437\u043e\u0432\u0430\u0442\u044c \u0442\u0430\u043a\u043e\u0435 \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435. \u0421\u0443\u0449\u0435\u0441\u0442\u0432\u0443\u044e\u0442 \u043c\u043d\u043e\u0433\u043e \u043f\u0440\u0438\u043c\u0435\u0440\u043e\u0432 \u0434\u043b\u044f <a href=\"https:\/\/micronaut.io\/\" rel=\"nofollow\">micronaut<\/a>, <a href=\"https:\/\/spring.io\/projects\/spring-boot\" rel=\"nofollow\">Spring Boot<\/a> \u0438 \u0442.\u0434. \u041d\u043e \u0432 \u044d\u0442\u0438\u0445 \u043f\u0440\u0438\u043c\u0435\u0440\u0430\u0445 \u043e\u043f\u0443\u0449\u0435\u043d\u044b \u043d\u0435\u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0434\u0435\u0442\u0430\u043b\u0438, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u043d\u0435 \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u044e\u0442 \u0441 \u043f\u0435\u0440\u0432\u043e\u0433\u043e \u0440\u0430\u0437\u0430 \u0437\u0430\u043f\u0443\u0441\u0442\u0438\u0442\u044c \u043a\u043e\u0434. \u0412 \u0441\u0442\u0430\u0442\u044c\u0435 \u043f\u0440\u0438\u0432\u043e\u0434\u044f\u0442\u0441\u044f \u043f\u0440\u0438\u043c\u0435\u0440\u044b \u0441 \u043f\u043e\u0434\u0440\u043e\u0431\u043d\u044b\u043c \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435\u043c \u0438 \u0441\u0441\u044b\u043b\u043a\u0430\u043c\u0438 \u043d\u0430 \u043a\u043e\u0434.<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[],"class_list":["post-316111","post","type-post","status-publish","format-standard","hentry"],"_links":{"self":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/316111","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=316111"}],"version-history":[{"count":0,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/316111\/revisions"}],"wp:attachment":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=316111"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=316111"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=316111"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}