Когда сервисы интегрируются при помощи Kafka очень удобно использовать REST API, как универсальный и стандартный способ обмена сообщениями. При увеличении количества сервисов сложность коммуникаций увеличивается. Для контроля можно и нужно использовать интеграционное тестирование. Такие библиотеки как testcontainers или EmbeddedServer прекрасно помогают организовать такое тестирование. Существуют много примеров для micronaut, Spring Boot и т.д. Но в этих примерах опущены некоторые детали, которые не позволяют с первого раза запустить код. В статье приводятся примеры с подробным описанием и ссылками на код.
Пример
Для простоты можно принять такой REST API.
/runs — POST-метод. Инициализирует запрос в канал связи. Принимает данные и возвращает ключ запроса.
/runs/{key}/status – GET-метод. По ключу возвращает статус запроса. Может принимать следующие значения: UNKNOWN, RUNNING, DONE.
/runs /{key} – GET-метод. По ключу возвращает результат запроса.
Подобный API реализован у livy, хотя и для других задач.
Реализация
Будут использоваться: micronaut, Spring Boot.
micronaut
Контроллер для API.
import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Get; import io.micronaut.http.annotation.Post; import io.reactivex.Maybe; import io.reactivex.schedulers.Schedulers; import javax.inject.Inject; import java.util.UUID; @Controller("/runs") public class RunController { @Inject RunClient runClient; @Inject RunCache runCache; @Post public String runs(@Body String body) { String key = UUID.randomUUID().toString(); runCache.statuses.put(key, RunStatus.RUNNING); runCache.responses.put(key, ""); runClient.sendRun(key, new Run(key, RunType.REQUEST, "", body)); return key; } @Get("/{key}/status") public Maybe<RunStatus> getRunStatus(String key) { return Maybe.just(key) .subscribeOn(Schedulers.io()) .map(it -> runCache.statuses.getOrDefault(it, RunStatus.UNKNOWN)); } @Get("/{key}") public Maybe<String> getRunResponse(String key) { return Maybe.just(key) .subscribeOn(Schedulers.io()) .map(it -> runCache.responses.getOrDefault(it, "")); } }
Отправка сообщений в kafka.
import io.micronaut.configuration.kafka.annotation.*; import io.micronaut.messaging.annotation.Body; @KafkaClient public interface RunClient { @Topic("runs") void sendRun(@KafkaKey String key, @Body Run run); }
Получение сообщений из kafka.
import io.micronaut.configuration.kafka.annotation.*; import io.micronaut.messaging.annotation.Body; import javax.inject.Inject; @KafkaListener(offsetReset = OffsetReset.EARLIEST) public class RunListener { @Inject RunCalculator runCalculator; @Topic("runs") public void receive(@KafkaKey String key, @Body Run run) { runCalculator.run(key, run); } }
Обработка сообщений происходит в RunCalculator. Для тестов используется особая реализация, в которой происходит переброска сообщений.
import io.micronaut.context.annotation.Replaces; import javax.inject.Inject; import javax.inject.Singleton; import java.util.UUID; @Replaces(RunCalculatorImpl.class) @Singleton public class RunCalculatorWithWork implements RunCalculator { @Inject RunClient runClient; @Inject RunCache runCache; @Override public void run(String key, Run run) { if (RunType.REQUEST.equals(run.getType())) { String runKey = run.getKey(); String newKey = UUID.randomUUID().toString(); String runBody = run.getBody(); runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + "_calculated")); } else if (RunType.RESPONSE.equals(run.getType())) { runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE); runCache.responses.replace(run.getResponseKey(), run.getBody()); } } }
Тест.
import io.micronaut.http.HttpRequest; import io.micronaut.http.client.HttpClient; import static org.junit.jupiter.api.Assertions.assertEquals; public abstract class RunBase { void run(HttpClient client) { String key = client.toBlocking().retrieve(HttpRequest.POST("/runs", "body")); RunStatus runStatus = RunStatus.UNKNOWN; while (runStatus != RunStatus.DONE) { runStatus = client.toBlocking().retrieve(HttpRequest.GET("/runs/" + key + "/status"), RunStatus.class); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } String response = client.toBlocking().retrieve(HttpRequest.GET("/runs/" + key), String.class); assertEquals("body_calculated", response); } }
Для использования EmbeddedServer необходимо.
Подключить библиотеки:
testImplementation("org.apache.kafka:kafka-clients:2.6.0:test") testImplementation("org.apache.kafka:kafka_2.12:2.6.0") testImplementation("org.apache.kafka:kafka_2.12:2.6.0:test")
Тест может выглядеть так.
import io.micronaut.context.ApplicationContext; import io.micronaut.http.client.HttpClient; import io.micronaut.runtime.server.EmbeddedServer; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; public class RunKeTest extends RunBase { @Test void test() { Map<String, Object> properties = new HashMap<>(); properties.put("kafka.bootstrap.servers", "localhost:9092"); properties.put("kafka.embedded.enabled", "true"); try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) { ApplicationContext applicationContext = embeddedServer.getApplicationContext(); HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI()); run(client); } } }
Для использования testcontainers необходимо.
Подключить библиотеки:
implementation("org.testcontainers:kafka:1.14.3")
Тест может выглядеть так.
import io.micronaut.context.ApplicationContext; import io.micronaut.http.client.HttpClient; import io.micronaut.runtime.server.EmbeddedServer; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; import java.util.HashMap; import java.util.Map; public class RunTcTest extends RunBase { @Test public void test() { try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"))) { kafka.start(); Map<String, Object> properties = new HashMap<>(); properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) { ApplicationContext applicationContext = embeddedServer.getApplicationContext(); HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI()); run(client); } } } }
Spring Boot
Контроллер для API.
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.UUID; @RestController @RequestMapping("/runs") public class RunController { @Autowired private RunClient runClient; @Autowired private RunCache runCache; @PostMapping() public String runs(@RequestBody String body) { String key = UUID.randomUUID().toString(); runCache.statuses.put(key, RunStatus.RUNNING); runCache.responses.put(key, ""); runClient.sendRun(key, new Run(key, RunType.REQUEST, "", body)); return key; } @GetMapping("/{key}/status") public RunStatus getRunStatus(@PathVariable String key) { return runCache.statuses.getOrDefault(key, RunStatus.UNKNOWN); } @GetMapping("/{key}") public String getRunResponse(@PathVariable String key) { return runCache.responses.getOrDefault(key, ""); } }
Отправка сообщений в kafka.
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class RunClient { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private ObjectMapper objectMapper; public void sendRun(String key, Run run) { String data = ""; try { data = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(run); } catch (JsonProcessingException e) { e.printStackTrace(); } kafkaTemplate.send("runs", key, data); } }
Получение сообщений из kafka.
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class RunListener { @Autowired private ObjectMapper objectMapper; @Autowired private RunCalculator runCalculator; @KafkaListener(topics = "runs", groupId = "m-group") public void receive(ConsumerRecord<?, ?> consumerRecord) { String key = consumerRecord.key().toString(); Run run = null; try { run = objectMapper.readValue(consumerRecord.value().toString(), Run.class); } catch (JsonProcessingException e) { e.printStackTrace(); } runCalculator.run(key, run); } }
Обработка сообщений происходит в RunCalculator. Для тестов используется особая реализация, в которой происходит переброска сообщений.
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Component public class RunCalculatorWithWork implements RunCalculator { @Autowired RunClient runClient; @Autowired RunCache runCache; @Override public void run(String key, Run run) { if (RunType.REQUEST.equals(run.getType())) { String runKey = run.getKey(); String newKey = UUID.randomUUID().toString(); String runBody = run.getBody(); runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + "_calculated")); } else if (RunType.RESPONSE.equals(run.getType())) { runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE); runCache.responses.replace(run.getResponseKey(), run.getBody()); } } }
Тест.
import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.http.MediaType; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; public abstract class RunBase { void run(MockMvc mockMvc, ObjectMapper objectMapper) throws Exception { MvcResult keyResult = mockMvc.perform(MockMvcRequestBuilders.post("/runs") .content("body") .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn(); String key = keyResult.getResponse().getContentAsString(); RunStatus runStatus = RunStatus.UNKNOWN; while (runStatus != RunStatus.DONE) { MvcResult statusResult = mockMvc.perform(MockMvcRequestBuilders.get("/runs/" + key + "/status") .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn(); runStatus = objectMapper.readValue(statusResult.getResponse().getContentAsString(), RunStatus.class); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } String response = mockMvc.perform(MockMvcRequestBuilders.get("/runs/" + key) .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()) .andReturn().getResponse().getContentAsString(); assertEquals("body_calculated", response); } }
Для использования EmbeddedServer необходимо.
Подключить библиотеки:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.10.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>2.5.10.RELEASE</version> <scope>test</scope> </dependency>
Тест может выглядеть так.
import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; @AutoConfigureMockMvc @SpringBootTest @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) @Import(RunKeTest.RunKeTestConfiguration.class) public class RunKeTest extends RunBase { @Autowired private MockMvc mockMvc; @Autowired private ObjectMapper objectMapper; @Test void test() throws Exception { run(mockMvc, objectMapper); } @TestConfiguration static class RunKeTestConfiguration { @Autowired private RunCache runCache; @Autowired private RunClient runClient; @Bean public RunCalculator runCalculator() { RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork(); runCalculatorWithWork.runCache = runCache; runCalculatorWithWork.runClient = runClient; return runCalculatorWithWork; } } }
Для использования testcontainers необходимо.
Подключить библиотеки:
<dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.14.3</version> <scope>test</scope> </dependency>
Тест может выглядеть так.
import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.ClassRule; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.test.web.servlet.MockMvc; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; import java.util.HashMap; import java.util.Map; @AutoConfigureMockMvc @SpringBootTest @Import(RunTcTest.RunTcTestConfiguration.class) public class RunTcTest extends RunBase { @ClassRule public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3")); static { kafka.start(); } @Autowired private MockMvc mockMvc; @Autowired private ObjectMapper objectMapper; @Test void test() throws Exception { run(mockMvc, objectMapper); } @TestConfiguration static class RunTcTestConfiguration { @Autowired private RunCache runCache; @Autowired private RunClient runClient; @Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "m-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public RunCalculator runCalculator() { RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork(); runCalculatorWithWork.runCache = runCache; runCalculatorWithWork.runClient = runClient; return runCalculatorWithWork; } } }
Перед всеми тестами необходимо стартовать kafka. Это делается вот таким вот образом:
kafka.start();
Дополнительные свойства для kafka в тестах можно задать в ресурсном файле.
application.yml
spring: kafka: consumer: auto-offset-reset: earliest
Ресурсы и ссылки
PART 1: TESTING KAFKA MICROSERVICES WITH MICRONAUT
ссылка на оригинал статьи https://habr.com/ru/post/536578/
Добавить комментарий