Параллелизм, асинхронность, многопоточность – Reactor почти всё сделает за Вас

от автора

Цель/введение

Реактивные паттерны программирования становятся всё более востребованы при реализации высоконагруженных сервисов. Реактивные фреймворки предоставляют инструменты, позволяющие с минимальными затратами на кодирование использовать механизмы асинхронности и многопоточности.

В качестве примера, предлагаю рассмотреть реализацию сервис индексации данных в ElasticSearch. Данные хранятся в MongoDB, ключевые атрибуты которых синхронизируются с ElasticSearch (функционально похоже на Logstash). В проекте используется стек: Java/Spring Boot/Reactor/WebFlux/WebClient/RabbitMQ/MongoDB. На выбор RabbitMQ и MongoDB повлияло, в том числе, наличие реактивных драйверов.

Описание задачи

  1. Сервис должен принимать поток данных из очереди, выбирать связанные данные из базы и передавать их ElasticSearch. Формат данных очереди: действие (index/delete); id документа; имя индекса; тип индекса (опционально).

  2. Через web-интерфейс должен быть реализован функционал добавления, удаления и перестроения индекса.

  3. Должна быть возможность формирования агрегированных полей, содержащих данные из нескольких исходных полей документов, и добавление данных в индекс из связанных коллекций.

  4. Описание индексируемых данных должно быть в формате JSON.

DFD-диаграмма процесса индексации

Схема процесса индексации запросов, поступающих из очереди, выглядит следующим образом:

Алгоритм перестроения индекса выглядит практически также, за исключением того, что в нем отсутствует обработка запросов, отложенных из-за ошибок.

Описание функционала

Описание функционала коснется только работы реактивной части сервиса. Конфигурационные настройки, обработка формата описания индексируемых данных, формирование данных для запросов к ElasticSearch вынесены за рамки данной статьи, но вы можете посмотреть код на GitHub, по ссылке.

Теперь попробуем реализовать эту схему сквозным потоком Reactor, не используя подписки на отдельные элементы, в том числе отправку через WebClient HTTP-запросов и обработку полученных ответов. Отдадим, почти полностью, синхронизацию выполнения Reactor.

Код, запускающий процесс переиндексации выглядит следующим образом:

Task task = new Task(mongoElasticIndex); ParallelFlux dataEventsFlux = reactorRepositoryMongoDB         .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic()); Flux<Tuple2<String,Document>> processingData = processingData(dataEventsFlux, (p) -> "index",         (p) -> (Document)p,         (p) -> mongoElasticIndex,         Flux.just(),         task);  task.setDispose(subscribe(processedData, task));

Получаем поток данных из коллекции, настраиваем параллелизм, формируем объект обработки потока и подписываемся на поток. Здесь класс Task – внутренний класс, назначение которого: собирать статистику и предоставлять информацию о выполняемых задачах индексации.

Метод processingData возвращает поток запросов и ответов, отправленных WebClient’ом:

private <T> Flux<Tuple2<String,Document>>     processingData(ParallelFlux<T> events,             Function<T, String> getAction,                    Function<T, Document> getDocument,                    Function<T, MongoElasticIndex> getMongoElasticIndex,                    Flux<String> mergeFlux,                    Task task) {     return  events             // Добавление данных к исходному документу из присоединяемых коллекций         .transform(joinData(getDocument, getMongoElasticIndex))             // Генерация данных для передачи в ElasticSearch         .transform(document2ElasticJson(getAction, getDocument, getMongoElasticIndex))         .sequential()             // Агрегирование данных для _bulk         .transform(grouping(task))             // Добавление потока данных, на которые не получен ответ от ElasticSearch         .mergeWith(mergeFlux)             // Отправка запросов в ElasticSearch         .transform(postBulk(task))         .subscribeOn(Schedulers.single())         .doOnNext(testAliveResponses(task))         .doOnSubscribe(p-> p.request(appConfig.getMaxSizeBuffer() * 2))         .doOnComplete(() -> { logger.info("Start: {} End: {} read {} write {}",                 formatDate(task.getStartDate()),                 formatDate(new Date()),                 task.getDocumentsRead(),                 task.getIndexesWrite(), getMaxProcessingRequest());             fileStorage.writeCollection2Files(waitingForResponse);             removeTask(task);         }); }

Методом transform Reactor соединяем отдельные обработчики потоков. Здесь есть одно существенное ограничение: входящий и исходящий потоки должны быть однотипными (Flux или ParallelFlux). Нельзя, например, с помощью transform встроить обработчик у которого вход Flux, а выход ParallelFlux.

В метод subscribe сервиса инкапсулирована подписка на поток. Ниже приведена его реализация:

private Disposable subscribe(Flux<Tuple2<String,Document>> events, Task task) {     return  events         .subscribe(             p -> {                 if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация                     waitingForResponse.remove(p.getT1());                 }                 int count = Optional.ofNullable(p.getT2().get("items", List.class))                         .map(List::size)                         .orElse(0);                 task.addIndexesWrite(count);             },             e -> {                 if(task != rabbitMQTask)removeTask(task);                 fileStorage.writeCollection2Files(waitingForResponse);                 logger.error("Error: {}", e.getMessage());             }         ); }

Далее коротко об отдельных функциях обработки потока.

Загрузка документов

Имеются два варианта загрузки:

  • Для всех документов основной коллекции индекса

ParallelFlux dataEventsFlux = reactorRepositoryMongoDB         .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic());

Метод findAll возвращает поток для всех документов коллекции. Parallel и runOn настраивают многопоточность для выборки и дальнейшей обработки.

  • Для единичного запроса, приходящему из очереди

ParallelFlux dataEventsFlux = reactiveQueue.inboundFlux()         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic())         .map(msg -> {             IndexEvent indexEvent = reactiveQueue.msg2IndexEvent(msg);             try {                 return CreateIndexItem(indexEvent);             } catch (IllegalObjectIdException | IOException | ConvertDataException e) {                 logger.error("{} For message: {}", String.join(", ",throwable2ListMessage(e)),                         new String(msg.getBody(), StandardCharsets.UTF_8));                 return new IndexItem(null, null, null);             }         })         .filter(e -> nonNull(e.getAction()))         .flatMap(item ->             Flux.zip("delete".equals(item.getAction())                         // Для операции удаления создаётся Document, содержащий _id удаляемого документа                     ? Flux.just(new Document().append("_id", item.getIdDocument().get("_id")))                         // Для операции обновления индекса Document загружается из базы данных                     : reactorRepositoryMongoDB.find(                         item.getMongoElasticIndex().getCollection(),                         item.getIdDocument(),                         item.getMongoElasticIndex().getProjection()),                 Flux.just(item)             )             .map(d -> new EventDocument(d.getT2().getAction(),                     d.getT1(),                     d.getT2().getMongoElasticIndex()))         );

Метод inboundFlux интерфейса reactiveQueue возвращает поток для очереди. Parallel и runOn идентичны предыдущему варианту. Далее событие преобразуется из JSON в объект IndexEvent, по содержимому которого документ извлекаются из базы, или создаётся объект для удаления документа из ElasticSearch.

Добавление данных к исходному документу из присоединяемых коллекций

private <T> Function<ParallelFlux<T>, ParallelFlux<T>>     joinData(Function<T, Document> getDocument,             Function<T, MongoElasticIndex> getMongoElasticIndex) {     return (ParallelFlux<T> items) ->             items.flatMap(p -> {             if(getDocument.apply(p).size() == 1) {                 return Flux.just(p);             }             return                 Flux.fromIterable(getMongoElasticIndex.apply((T) p).getJoinConditions(getDocument.apply(p)))                         .flatMap(it -> Flux.zip(Flux.just(it.getCollection().getJoinedFieldName()),                                 reactorRepositoryMongoDB.find(getMongoElasticIndex.apply((T) p).getCollection(),                                         it.getCondition(),                                         it.getCollection().getProjection())))                         .reduce(p, (acc, t) -> {                             getDocument.apply(acc).put(t.getT1(), t.getT2());                             return acc;                         });                 }         ); }

Метод joinData возвращает функциональный объект, добавляющий данные к исходному документу из документов присоединяемых коллекций. Использование flatMap и Flux.zip позволяет асинхронно запускать и обрабатывать потоки, в том числе и потоки, создаваемые запросами к базе данных mongodb. Все вопросы, связанные с синхронизацией, берет на себя Reactor.

Генерация JSON для ElasticSearch

private <T> Function<ParallelFlux<T>, ParallelFlux<String>>     document2ElasticJson(             Function<T, String> getAction,             Function<T, Document> getDocument,             Function<T, MongoElasticIndex> getMongoElasticIndex) {     return (ParallelFlux<T> items) -> items.map(item -> {         String elasticSend;         try {             Document document = getDocument.apply(item);             MongoElasticIndex mongoElasticIndex = getMongoElasticIndex.apply(item);             elasticSend = "delete".equals(getAction.apply(item))                     ? mongoElasticIndex.deleteBuild(document)                     : mongoElasticIndex.indexBuild(document);         } catch (ConvertDataException e) {             throw new RuntimeException(e);         } catch (JsonProcessingException e) {             throw new UncheckedIOException(e);         }         return elasticSend;     }); } 

Из полученного документа формируется JSON-объект модификации индекса в ElasticSearch. Контролируемые исключения приходится конвертировать в неконтролируемые.

Агрегирование данных для _bulk-запроса

Function<Flux<String>, Flux<String>> grouping(Task task) {     return (Flux<String> source) -> source             .bufferTimeout(appConfig.getMaxSizeBuffer(),                     Duration.ofMillis(appConfig.getMaxDurationBuffer()))             .doOnNext(p -> task.addDocumentsRead(p.size()))             .map(p -> String.join("\n", p)             ); }

Использование _bulk-запроса к ElasticSearch позволяет существенно снизить трафик и повысить производительность индексации. Объединение отправляемых данных несложно сделать при помощи bufferTimeout. Значениями максимального размера буфера и времени ожидания можно найти компромисс между оперативностью обновления данных в ElasticSearch, размером запроса и производительностью.

Отправка запросов ElasticSearch

public Function<Flux<String>, Flux<Tuple2<String, Document>>> postBulk(Task task) {     return (Flux<String> source) -> source         .flatMap(buffer -> {             if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация                 waitingForResponse.add(buffer);             }             return Flux.zip(Flux.just(buffer),                 webClientElastic.post()                     .uri("/_bulk")                     .body(BodyInserters.fromValue(buffer))                     .retrieve()                     .onStatus(httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),                             response -> Mono.error(new HttpServiceException("System is overloaded",                                     response.rawStatusCode())))                     .onStatus(httpStatus -> httpStatus.is4xxClientError() && !httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),                             response -> Mono.error(new RuntimeException("API not found")))                     .onStatus(HttpStatus::is5xxServerError,                             response -> Mono.error(new HttpServiceException("Server is not responding",                                     response.rawStatusCode())))                     .bodyToFlux(Document.class)                     .retryWhen(Retry.backoff(appConfig.getWebClientRetryMaxAttempts(),                                 Duration.ofSeconds(appConfig.getWebClientRetryMinBackoff()))                         .filter(throwable -> throwable instanceof HttpServiceException)                         .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {                             throw new HttpServiceException("External Service failed to process after max retries",                                     HttpStatus.SERVICE_UNAVAILABLE.value());                         }))             );         });  }

Создаётся поток, отправляющий через WebClient запросы к ElasticSearch. Поток, формируемый методом post WebClient’а, Flux.zip объединяет с запросом, это позволяет при обработке ответа связать полученный ответ с отправленным запросом. С помощью retryWhen, Retry.backoff настроена обработка некоторых ошибок.

Обработка ответов ElasticSearch

private Disposable subscribe(Flux<Tuple2<String,Document>> events, Task task) {     return  events         .subscribe(             p -> {                 if(isNull(task.getMongoElasticIndex())) { // Если задача не переиндексация                     waitingForResponse.remove(p.getT1());                 }                 int count = Optional.ofNullable(p.getT2().get("items", List.class))                         .map(List::size)                         .orElse(0);                 task.addIndexesWrite(count);             },             e -> {                 if(task != rabbitMQTask)removeTask(task);                 fileStorage.writeCollection2Files(waitingForResponse);                 logger.error("Error: {}", e.getMessage());             }         ); }

Обработка ответов ElasticSearch минимальна. Если ответ получен на контролируемый запрос (не запрос на переиндексацию), то запрос удаляется из множества запросов, для которых контролируется получение ответа. Ответы на переиндексацию не контролируются. В полученном ответе атрибут items должен быть списком, содержащим информацию об обработанных документах. На количество элементов в списке увеличивается счетчик обработанных документов.

Настройка WebClient

Основная часть настройки делается в конфигурационном классе, бин возвращает объект WebClient.Builder:

@Bean @Qualifier("elastic") public WebClient.Builder webClientWithTimeout() {     final TcpClient tcpClient = TcpClient             .create()             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)             .doOnConnected(connection -> {                 connection.addHandlerLast(new ReadTimeoutHandler(timeout, TimeUnit.MILLISECONDS));                 connection.addHandlerLast(new WriteTimeoutHandler(timeout, TimeUnit.MILLISECONDS));             });      return WebClient.builder()             .baseUrl(baseUrl +":" + port.toString())             .filter(basicAuthentication(user, password))             .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); } 

В конструкторе сервиса добавляются фильтры, вызываемые при отправке запроса и получении ответа:

this.webClientElastic = webClientElastic         .filter(onRequest())         .filter(onResponse())         .build();

Методы, возвращающие фильтры:

private ExchangeFilterFunction onRequest() {     return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {         addSendRequest();         int sleepCycleCount = 0;         while (getProcessingRequest() > getMaxProcessingRequest()) {             try {                 logger.info("Sleep: {} ProcessingRequest reached {} (MaxProcessingRequest {})", getSleepOverRequest(),                         getProcessingRequest() - 1, getMaxProcessingRequest());                 sleep(getSleepOverRequest());                 if (sleepCycleCount++ > appConfig.getSleepCycleCountMax()) {                     break;                 }             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());         return Mono.just(clientRequest);     }); }  private ExchangeFilterFunction onResponse() {     return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {         addReceiveResponse();         logger.info("Response Status {}", clientResponse.statusCode());         return Mono.just(clientResponse);     }); } 

Фильтры выводят информацию об отправке запросов, получении ответов и модифицируют счетчики отправленных запросов и полученных ответов. Перед отправкой запроса, если превышено количество не полученных ответов, процесс “засыпает” на некоторое время.

Настройка среды выполнения

Для того чтобы запустить этот сервис нам нужны: rabbitmq, mongodb и elasticsearch. Всё это проще установить в Docker. Ещё в самом начале проекта установил Docker Desktop и настроил контейнеры для запуска нужных cервисов. Как это делается можно посмотреть, например, в этой статье. По аналогии установил rabbitmq, mongodb. Добавил конфигурационные файлы и внес изменения в файл docker-compose.yml. Получившиеся настройки Docker можно найти в папке проекта docker-elk. Ниже скриншот запущенного контейнера:

Запуск сервиса

Для тестирования загрузил в базу mongodb 1000 документов. Из Postman и отправляю запрос:

В полученном логе видно, что обработка выполняется в разных потоках:

2022-11-02 15:23:17.396  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.418  INFO 8336 --- [      Thread-42] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.447  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.462  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.475  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.477  INFO 8336 --- [      Thread-33] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:855, serverValue:83}] to localhost:27017 2022-11-02 15:23:17.484  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.485  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.489  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.493  INFO 8336 --- [       Thread-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:856, serverValue:84}] to localhost:27017 2022-11-02 15:23:17.566  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.588  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.622  INFO 8336 --- [       Thread-4] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.651  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.651  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.658  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.658  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.673  INFO 8336 --- [      Thread-52] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:940, serverValue:85}] to localhost:27017 2022-11-02 15:23:17.676  INFO 8336 --- [       Thread-7] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:941, serverValue:86}] to localhost:27017 2022-11-02 15:23:17.715  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.738  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.748  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.772  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.790  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.811  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.818  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.849  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:993, serverValue:87}] to localhost:27017 2022-11-02 15:23:17.851  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:994, serverValue:88}] to localhost:27017 2022-11-02 15:23:17.895  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.898  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.911  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.926  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.944  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.966  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:17.993  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1078, serverValue:90}] to localhost:27017 2022-11-02 15:23:18.002  INFO 8336 --- [      Thread-48] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1079, serverValue:89}] to localhost:27017 2022-11-02 15:23:18.041  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.059  INFO 8336 --- [      Thread-31] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:18.076  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:18.083  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.096  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.135  INFO 8336 --- [      Thread-34] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1156, serverValue:92}] to localhost:27017 2022-11-02 15:23:18.138  INFO 8336 --- [      Thread-39] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1155, serverValue:91}] to localhost:27017 2022-11-02 15:23:18.140  INFO 8336 --- [      Thread-39] ru.mvz.elasticsearch.service.Indexer     : Request: POST http://localhost:9200/_bulk 2022-11-02 15:23:18.180  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.180  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.181  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Start: 2022-11-02 15:23:17.250 End: 2022-11-02 15:23:18.181 read 1000 write 1000

Теперь проверим, что загрузилось.

Запрос к ElasticSearch показывает наличие индекса с 1000 документами:

И попробуем найти что-то в ElasticSearch:

Получен ответ ElasticSearch с найденным документом!

Заключение

В этом материале мне хотелось привести пример сервиса, реализованного с использованием Spring Boot, WebFlux, WebClient, Reactor — надеюсь, что у меня это получилось.

Несколько выводов:

  1. Реактивные фреймворки, в том числе и Reactor, делает за нас существенную часть работы по реализации асинхронных многопоточных алгоритмов, позволяя сосредоточиться на предметной области.

  2. С их помощью можно, достаточно просто, создавать высоконагруженные сервисы.

  3. Для получения максимального эффекта от перехода на реактивные паттерны программирование нужно чтобы вся цепочка вычислений была реактивной, начиная с драйверов доступа к базам данных, очередям, файлам и т.д.

Ещё раз, репозиторий с кодом и настройками находится здесь

Несколько ссылок на используемые материалы:

Шпаргалка по Spring Boot WebClient

Reactive Programming: Reactor и Spring WebFlux — часть 2

Реактивное программирование со Spring, часть 2 Project Reactor


ссылка на оригинал статьи https://habr.com/ru/post/699112/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *