{"id":341197,"date":"2022-11-14T03:00:12","date_gmt":"2022-11-14T03:00:12","guid":{"rendered":"http:\/\/savepearlharbor.com\/?p=341197"},"modified":"-0001-11-30T00:00:00","modified_gmt":"-0001-11-29T21:00:00","slug":"","status":"publish","type":"post","link":"https:\/\/savepearlharbor.com\/?p=341197","title":{"rendered":"<span>\u041f\u0430\u0440\u0430\u043b\u043b\u0435\u043b\u0438\u0437\u043c, \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u043e\u0441\u0442\u044c, \u043c\u043d\u043e\u0433\u043e\u043f\u043e\u0442\u043e\u0447\u043d\u043e\u0441\u0442\u044c \u2013 Reactor \u043f\u043e\u0447\u0442\u0438 \u0432\u0441\u0451 \u0441\u0434\u0435\u043b\u0430\u0435\u0442 \u0437\u0430 \u0412\u0430\u0441<\/span>"},"content":{"rendered":"<div><\/div>\n<div id=\"post-content-body\">\n<div>\n<div class=\"article-formatted-body article-formatted-body article-formatted-body_version-2\">\n<div xmlns=\"http:\/\/www.w3.org\/1999\/xhtml\">\n<h2>\u0426\u0435\u043b\u044c\/\u0432\u0432\u0435\u0434\u0435\u043d\u0438\u0435<\/h2>\n<p>\u0420\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u044b\u0435 \u043f\u0430\u0442\u0442\u0435\u0440\u043d\u044b \u043f\u0440\u043e\u0433\u0440\u0430\u043c\u043c\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f \u0441\u0442\u0430\u043d\u043e\u0432\u044f\u0442\u0441\u044f \u0432\u0441\u0451 \u0431\u043e\u043b\u0435\u0435 \u0432\u043e\u0441\u0442\u0440\u0435\u0431\u043e\u0432\u0430\u043d\u044b \u043f\u0440\u0438 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0432\u044b\u0441\u043e\u043a\u043e\u043d\u0430\u0433\u0440\u0443\u0436\u0435\u043d\u043d\u044b\u0445 \u0441\u0435\u0440\u0432\u0438\u0441\u043e\u0432. \u0420\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u044b\u0435 \u0444\u0440\u0435\u0439\u043c\u0432\u043e\u0440\u043a\u0438 \u043f\u0440\u0435\u0434\u043e\u0441\u0442\u0430\u0432\u043b\u044f\u044e\u0442 \u0438\u043d\u0441\u0442\u0440\u0443\u043c\u0435\u043d\u0442\u044b, \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u044e\u0449\u0438\u0435 \u0441 \u043c\u0438\u043d\u0438\u043c\u0430\u043b\u044c\u043d\u044b\u043c\u0438 \u0437\u0430\u0442\u0440\u0430\u0442\u0430\u043c\u0438 \u043d\u0430 \u043a\u043e\u0434\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u043c\u0435\u0445\u0430\u043d\u0438\u0437\u043c\u044b \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u043e\u0441\u0442\u0438 \u0438 \u043c\u043d\u043e\u0433\u043e\u043f\u043e\u0442\u043e\u0447\u043d\u043e\u0441\u0442\u0438.<\/p>\n<p>\u0412 \u043a\u0430\u0447\u0435\u0441\u0442\u0432\u0435 \u043f\u0440\u0438\u043c\u0435\u0440\u0430, \u043f\u0440\u0435\u0434\u043b\u0430\u0433\u0430\u044e \u0440\u0430\u0441\u0441\u043c\u043e\u0442\u0440\u0435\u0442\u044c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0441\u0435\u0440\u0432\u0438\u0441 \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438 \u0434\u0430\u043d\u043d\u044b\u0445 \u0432 ElasticSearch. \u0414\u0430\u043d\u043d\u044b\u0435 \u0445\u0440\u0430\u043d\u044f\u0442\u0441\u044f \u0432 MongoDB, \u043a\u043b\u044e\u0447\u0435\u0432\u044b\u0435 \u0430\u0442\u0440\u0438\u0431\u0443\u0442\u044b \u043a\u043e\u0442\u043e\u0440\u044b\u0445 \u0441\u0438\u043d\u0445\u0440\u043e\u043d\u0438\u0437\u0438\u0440\u0443\u044e\u0442\u0441\u044f \u0441 ElasticSearch (\u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u043e \u043f\u043e\u0445\u043e\u0436\u0435 \u043d\u0430 Logstash). \u0412 \u043f\u0440\u043e\u0435\u043a\u0442\u0435 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442\u0441\u044f \u0441\u0442\u0435\u043a: Java\/Spring Boot\/Reactor\/WebFlux\/WebClient\/RabbitMQ\/MongoDB. \u041d\u0430 \u0432\u044b\u0431\u043e\u0440 RabbitMQ \u0438 MongoDB \u043f\u043e\u0432\u043b\u0438\u044f\u043b\u043e, \u0432 \u0442\u043e\u043c \u0447\u0438\u0441\u043b\u0435, \u043d\u0430\u043b\u0438\u0447\u0438\u0435 \u0440\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u044b\u0445 \u0434\u0440\u0430\u0439\u0432\u0435\u0440\u043e\u0432.<\/p>\n<h3>\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0437\u0430\u0434\u0430\u0447\u0438<\/h3>\n<ol>\n<li>\n<p>\u0421\u0435\u0440\u0432\u0438\u0441 \u0434\u043e\u043b\u0436\u0435\u043d \u043f\u0440\u0438\u043d\u0438\u043c\u0430\u0442\u044c \u043f\u043e\u0442\u043e\u043a \u0434\u0430\u043d\u043d\u044b\u0445 \u0438\u0437 \u043e\u0447\u0435\u0440\u0435\u0434\u0438, \u0432\u044b\u0431\u0438\u0440\u0430\u0442\u044c \u0441\u0432\u044f\u0437\u0430\u043d\u043d\u044b\u0435 \u0434\u0430\u043d\u043d\u044b\u0435 \u0438\u0437 \u0431\u0430\u0437\u044b \u0438 \u043f\u0435\u0440\u0435\u0434\u0430\u0432\u0430\u0442\u044c \u0438\u0445 ElasticSearch. \u0424\u043e\u0440\u043c\u0430\u0442 \u0434\u0430\u043d\u043d\u044b\u0445 \u043e\u0447\u0435\u0440\u0435\u0434\u0438: \u0434\u0435\u0439\u0441\u0442\u0432\u0438\u0435 (index\/delete); id \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430; \u0438\u043c\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0430; \u0442\u0438\u043f \u0438\u043d\u0434\u0435\u043a\u0441\u0430 (\u043e\u043f\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u043e).<\/p>\n<\/li>\n<li>\n<p>\u0427\u0435\u0440\u0435\u0437 web-\u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441 \u0434\u043e\u043b\u0436\u0435\u043d \u0431\u044b\u0442\u044c \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u043d \u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b \u0434\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u044f, \u0443\u0434\u0430\u043b\u0435\u043d\u0438\u044f \u0438 \u043f\u0435\u0440\u0435\u0441\u0442\u0440\u043e\u0435\u043d\u0438\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0430.<\/p>\n<\/li>\n<li>\n<p>\u0414\u043e\u043b\u0436\u043d\u0430 \u0431\u044b\u0442\u044c \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044c \u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f \u0430\u0433\u0440\u0435\u0433\u0438\u0440\u043e\u0432\u0430\u043d\u043d\u044b\u0445 \u043f\u043e\u043b\u0435\u0439, \u0441\u043e\u0434\u0435\u0440\u0436\u0430\u0449\u0438\u0445 \u0434\u0430\u043d\u043d\u044b\u0435 \u0438\u0437 \u043d\u0435\u0441\u043a\u043e\u043b\u044c\u043a\u0438\u0445 \u0438\u0441\u0445\u043e\u0434\u043d\u044b\u0445 \u043f\u043e\u043b\u0435\u0439 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432, \u0438 \u0434\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u0432 \u0438\u043d\u0434\u0435\u043a\u0441 \u0438\u0437 \u0441\u0432\u044f\u0437\u0430\u043d\u043d\u044b\u0445 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0439.<\/p>\n<\/li>\n<li>\n<p>\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043d\u0434\u0435\u043a\u0441\u0438\u0440\u0443\u0435\u043c\u044b\u0445 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043e\u043b\u0436\u043d\u043e \u0431\u044b\u0442\u044c \u0432 \u0444\u043e\u0440\u043c\u0430\u0442\u0435 JSON.<\/p>\n<\/li>\n<\/ol>\n<h2>DFD-\u0434\u0438\u0430\u0433\u0440\u0430\u043c\u043c\u0430 \u043f\u0440\u043e\u0446\u0435\u0441\u0441\u0430 \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438<\/h2>\n<p>\u0421\u0445\u0435\u043c\u0430 \u043f\u0440\u043e\u0446\u0435\u0441\u0441\u0430 \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432, \u043f\u043e\u0441\u0442\u0443\u043f\u0430\u044e\u0449\u0438\u0445 \u0438\u0437 \u043e\u0447\u0435\u0440\u0435\u0434\u0438, \u0432\u044b\u0433\u043b\u044f\u0434\u0438\u0442 \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u043c \u043e\u0431\u0440\u0430\u0437\u043e\u043c:<\/p>\n<figure class=\"full-width\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w1560\/getpro\/habr\/upload_files\/715\/998\/d3c\/715998d3c6631e7db992bd57787d0ee4.png\" width=\"679\" height=\"314\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/upload_files\/715\/998\/d3c\/715998d3c6631e7db992bd57787d0ee4.png\"\/><figcaption><\/figcaption><\/figure>\n<p>\u0410\u043b\u0433\u043e\u0440\u0438\u0442\u043c \u043f\u0435\u0440\u0435\u0441\u0442\u0440\u043e\u0435\u043d\u0438\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0430 \u0432\u044b\u0433\u043b\u044f\u0434\u0438\u0442 \u043f\u0440\u0430\u043a\u0442\u0438\u0447\u0435\u0441\u043a\u0438 \u0442\u0430\u043a\u0436\u0435, \u0437\u0430 \u0438\u0441\u043a\u043b\u044e\u0447\u0435\u043d\u0438\u0435\u043c \u0442\u043e\u0433\u043e, \u0447\u0442\u043e \u0432 \u043d\u0435\u043c \u043e\u0442\u0441\u0443\u0442\u0441\u0442\u0432\u0443\u0435\u0442 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432, \u043e\u0442\u043b\u043e\u0436\u0435\u043d\u043d\u044b\u0445 \u0438\u0437-\u0437\u0430 \u043e\u0448\u0438\u0431\u043e\u043a.<\/p>\n<h2>\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u0430<\/h2>\n<p>\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u0430 \u043a\u043e\u0441\u043d\u0435\u0442\u0441\u044f \u0442\u043e\u043b\u044c\u043a\u043e \u0440\u0430\u0431\u043e\u0442\u044b \u0440\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u043e\u0439 \u0447\u0430\u0441\u0442\u0438 \u0441\u0435\u0440\u0432\u0438\u0441\u0430. \u041a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u043e\u043d\u043d\u044b\u0435 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438, \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0444\u043e\u0440\u043c\u0430\u0442\u0430 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0438\u0440\u0443\u0435\u043c\u044b\u0445 \u0434\u0430\u043d\u043d\u044b\u0445, \u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u043a ElasticSearch \u0432\u044b\u043d\u0435\u0441\u0435\u043d\u044b \u0437\u0430 \u0440\u0430\u043c\u043a\u0438 \u0434\u0430\u043d\u043d\u043e\u0439 \u0441\u0442\u0430\u0442\u044c\u0438, \u043d\u043e \u0432\u044b \u043c\u043e\u0436\u0435\u0442\u0435 \u043f\u043e\u0441\u043c\u043e\u0442\u0440\u0435\u0442\u044c \u043a\u043e\u0434 \u043d\u0430 GitHub, \u043f\u043e <a href=\"https:\/\/github.com\/ValentinMorozov\/elastic-search\" rel=\"noopener noreferrer nofollow\">\u0441\u0441\u044b\u043b\u043a\u0435<\/a>.<\/p>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0435\u043c \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u0442\u044c \u044d\u0442\u0443 \u0441\u0445\u0435\u043c\u0443 \u0441\u043a\u0432\u043e\u0437\u043d\u044b\u043c \u043f\u043e\u0442\u043e\u043a\u043e\u043c Reactor, \u043d\u0435 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044f \u043f\u043e\u0434\u043f\u0438\u0441\u043a\u0438 \u043d\u0430 \u043e\u0442\u0434\u0435\u043b\u044c\u043d\u044b\u0435 \u044d\u043b\u0435\u043c\u0435\u043d\u0442\u044b, \u0432 \u0442\u043e\u043c \u0447\u0438\u0441\u043b\u0435 \u043e\u0442\u043f\u0440\u0430\u0432\u043a\u0443 \u0447\u0435\u0440\u0435\u0437 WebClient HTTP-\u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u0438 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0443 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u043d\u044b\u0445 \u043e\u0442\u0432\u0435\u0442\u043e\u0432. \u041e\u0442\u0434\u0430\u0434\u0438\u043c, \u043f\u043e\u0447\u0442\u0438 \u043f\u043e\u043b\u043d\u043e\u0441\u0442\u044c\u044e, \u0441\u0438\u043d\u0445\u0440\u043e\u043d\u0438\u0437\u0430\u0446\u0438\u044e \u0432\u044b\u043f\u043e\u043b\u043d\u0435\u043d\u0438\u044f Reactor. <\/p>\n<p>\u041a\u043e\u0434, \u0437\u0430\u043f\u0443\u0441\u043a\u0430\u044e\u0449\u0438\u0439 \u043f\u0440\u043e\u0446\u0435\u0441\u0441 \u043f\u0435\u0440\u0435\u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438 \u0432\u044b\u0433\u043b\u044f\u0434\u0438\u0442 \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u043c \u043e\u0431\u0440\u0430\u0437\u043e\u043c: <\/p>\n<pre><code>Task task = new Task(mongoElasticIndex); ParallelFlux dataEventsFlux = reactorRepositoryMongoDB         .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic()); Flux&lt;Tuple2&lt;String,Document>> processingData = processingData(dataEventsFlux, (p) -> \"index\",         (p) -> (Document)p,         (p) -> mongoElasticIndex,         Flux.just(),         task);  task.setDispose(subscribe(processedData, task));<\/code><\/pre>\n<p>\u041f\u043e\u043b\u0443\u0447\u0430\u0435\u043c \u043f\u043e\u0442\u043e\u043a \u0434\u0430\u043d\u043d\u044b\u0445 \u0438\u0437 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0438, \u043d\u0430\u0441\u0442\u0440\u0430\u0438\u0432\u0430\u0435\u043c \u043f\u0430\u0440\u0430\u043b\u043b\u0435\u043b\u0438\u0437\u043c,  \u0444\u043e\u0440\u043c\u0438\u0440\u0443\u0435\u043c \u043e\u0431\u044a\u0435\u043a\u0442 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438 \u043f\u043e\u0442\u043e\u043a\u0430 \u0438 \u043f\u043e\u0434\u043f\u0438\u0441\u044b\u0432\u0430\u0435\u043c\u0441\u044f \u043d\u0430 \u043f\u043e\u0442\u043e\u043a. \u0417\u0434\u0435\u0441\u044c \u043a\u043b\u0430\u0441\u0441 Task \u2013 \u0432\u043d\u0443\u0442\u0440\u0435\u043d\u043d\u0438\u0439 \u043a\u043b\u0430\u0441\u0441, \u043d\u0430\u0437\u043d\u0430\u0447\u0435\u043d\u0438\u0435 \u043a\u043e\u0442\u043e\u0440\u043e\u0433\u043e: \u0441\u043e\u0431\u0438\u0440\u0430\u0442\u044c \u0441\u0442\u0430\u0442\u0438\u0441\u0442\u0438\u043a\u0443 \u0438 \u043f\u0440\u0435\u0434\u043e\u0441\u0442\u0430\u0432\u043b\u044f\u0442\u044c \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044e \u043e \u0432\u044b\u043f\u043e\u043b\u043d\u044f\u0435\u043c\u044b\u0445 \u0437\u0430\u0434\u0430\u0447\u0430\u0445 \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438.<\/p>\n<p>\u041c\u0435\u0442\u043e\u0434 processingData \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u043f\u043e\u0442\u043e\u043a \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u0438 \u043e\u0442\u0432\u0435\u0442\u043e\u0432, \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u0435\u043d\u043d\u044b\u0445 WebClient\u2019\u043e\u043c:<\/p>\n<pre><code>private &lt;T> Flux&lt;Tuple2&lt;String,Document>>     processingData(ParallelFlux&lt;T> events,             Function&lt;T, String> getAction,                    Function&lt;T, Document> getDocument,                    Function&lt;T, MongoElasticIndex> getMongoElasticIndex,                    Flux&lt;String> mergeFlux,                    Task task) {     return  events             \/\/ \u0414\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u043a \u0438\u0441\u0445\u043e\u0434\u043d\u043e\u043c\u0443 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0443 \u0438\u0437 \u043f\u0440\u0438\u0441\u043e\u0435\u0434\u0438\u043d\u044f\u0435\u043c\u044b\u0445 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0439         .transform(joinData(getDocument, getMongoElasticIndex))             \/\/ \u0413\u0435\u043d\u0435\u0440\u0430\u0446\u0438\u044f \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f \u043f\u0435\u0440\u0435\u0434\u0430\u0447\u0438 \u0432 ElasticSearch         .transform(document2ElasticJson(getAction, getDocument, getMongoElasticIndex))         .sequential()             \/\/ \u0410\u0433\u0440\u0435\u0433\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f _bulk         .transform(grouping(task))             \/\/ \u0414\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u043f\u043e\u0442\u043e\u043a\u0430 \u0434\u0430\u043d\u043d\u044b\u0445, \u043d\u0430 \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u043d\u0435 \u043f\u043e\u043b\u0443\u0447\u0435\u043d \u043e\u0442\u0432\u0435\u0442 \u043e\u0442 ElasticSearch         .mergeWith(mergeFlux)             \/\/ \u041e\u0442\u043f\u0440\u0430\u0432\u043a\u0430 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u0432 ElasticSearch         .transform(postBulk(task))         .subscribeOn(Schedulers.single())         .doOnNext(testAliveResponses(task))         .doOnSubscribe(p-> p.request(appConfig.getMaxSizeBuffer() * 2))         .doOnComplete(() -> { logger.info(\"Start: {} End: {} read {} write {}\",                 formatDate(task.getStartDate()),                 formatDate(new Date()),                 task.getDocumentsRead(),                 task.getIndexesWrite(), getMaxProcessingRequest());             fileStorage.writeCollection2Files(waitingForResponse);             removeTask(task);         }); }<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434\u043e\u043c transform Reactor \u0441\u043e\u0435\u0434\u0438\u043d\u044f\u0435\u043c \u043e\u0442\u0434\u0435\u043b\u044c\u043d\u044b\u0435 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0447\u0438\u043a\u0438 \u043f\u043e\u0442\u043e\u043a\u043e\u0432. \u0417\u0434\u0435\u0441\u044c \u0435\u0441\u0442\u044c \u043e\u0434\u043d\u043e \u0441\u0443\u0449\u0435\u0441\u0442\u0432\u0435\u043d\u043d\u043e\u0435 \u043e\u0433\u0440\u0430\u043d\u0438\u0447\u0435\u043d\u0438\u0435: \u0432\u0445\u043e\u0434\u044f\u0449\u0438\u0439 \u0438 \u0438\u0441\u0445\u043e\u0434\u044f\u0449\u0438\u0439 \u043f\u043e\u0442\u043e\u043a\u0438 \u0434\u043e\u043b\u0436\u043d\u044b \u0431\u044b\u0442\u044c \u043e\u0434\u043d\u043e\u0442\u0438\u043f\u043d\u044b\u043c\u0438 (Flux \u0438\u043b\u0438 ParallelFlux). \u041d\u0435\u043b\u044c\u0437\u044f, \u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440, \u0441 \u043f\u043e\u043c\u043e\u0449\u044c\u044e transform \u0432\u0441\u0442\u0440\u043e\u0438\u0442\u044c \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0447\u0438\u043a \u0443 \u043a\u043e\u0442\u043e\u0440\u043e\u0433\u043e \u0432\u0445\u043e\u0434 Flux, \u0430 \u0432\u044b\u0445\u043e\u0434 ParallelFlux.<\/p>\n<p>\u0412 \u043c\u0435\u0442\u043e\u0434 subscribe \u0441\u0435\u0440\u0432\u0438\u0441\u0430 \u0438\u043d\u043a\u0430\u043f\u0441\u0443\u043b\u0438\u0440\u043e\u0432\u0430\u043d\u0430 \u043f\u043e\u0434\u043f\u0438\u0441\u043a\u0430 \u043d\u0430 \u043f\u043e\u0442\u043e\u043a. \u041d\u0438\u0436\u0435 \u043f\u0440\u0438\u0432\u0435\u0434\u0435\u043d\u0430 \u0435\u0433\u043e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f:<\/p>\n<pre><code>private Disposable subscribe(Flux&lt;Tuple2&lt;String,Document>> events, Task task) {     return  events         .subscribe(             p -> {                 if(isNull(task.getMongoElasticIndex())) { \/\/ \u0415\u0441\u043b\u0438 \u0437\u0430\u0434\u0430\u0447\u0430 \u043d\u0435 \u043f\u0435\u0440\u0435\u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u044f                     waitingForResponse.remove(p.getT1());                 }                 int count = Optional.ofNullable(p.getT2().get(\"items\", List.class))                         .map(List::size)                         .orElse(0);                 task.addIndexesWrite(count);             },             e -> {                 if(task != rabbitMQTask)removeTask(task);                 fileStorage.writeCollection2Files(waitingForResponse);                 logger.error(\"Error: {}\", e.getMessage());             }         ); }<\/code><\/pre>\n<p>\u0414\u0430\u043b\u0435\u0435 \u043a\u043e\u0440\u043e\u0442\u043a\u043e \u043e\u0431 \u043e\u0442\u0434\u0435\u043b\u044c\u043d\u044b\u0445 \u0444\u0443\u043d\u043a\u0446\u0438\u044f\u0445 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438 \u043f\u043e\u0442\u043e\u043a\u0430.<\/p>\n<h3>\u0417\u0430\u0433\u0440\u0443\u0437\u043a\u0430 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432<\/h3>\n<p>\u0418\u043c\u0435\u044e\u0442\u0441\u044f \u0434\u0432\u0430 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0430 \u0437\u0430\u0433\u0440\u0443\u0437\u043a\u0438: <\/p>\n<ul>\n<li>\n<p>\u0414\u043b\u044f \u0432\u0441\u0435\u0445 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432 \u043e\u0441\u043d\u043e\u0432\u043d\u043e\u0439 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0438 \u0438\u043d\u0434\u0435\u043a\u0441\u0430<\/p>\n<\/li>\n<\/ul>\n<pre><code>ParallelFlux dataEventsFlux = reactorRepositoryMongoDB         .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic());<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434 findAll \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u043f\u043e\u0442\u043e\u043a \u0434\u043b\u044f \u0432\u0441\u0435\u0445 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0438. Parallel \u0438 runOn \u043d\u0430\u0441\u0442\u0440\u0430\u0438\u0432\u0430\u044e\u0442 \u043c\u043d\u043e\u0433\u043e\u043f\u043e\u0442\u043e\u0447\u043d\u043e\u0441\u0442\u044c \u0434\u043b\u044f \u0432\u044b\u0431\u043e\u0440\u043a\u0438 \u0438 \u0434\u0430\u043b\u044c\u043d\u0435\u0439\u0448\u0435\u0439 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438.   <\/p>\n<ul>\n<li>\n<p>\u0414\u043b\u044f \u0435\u0434\u0438\u043d\u0438\u0447\u043d\u043e\u0433\u043e \u0437\u0430\u043f\u0440\u043e\u0441\u0430, \u043f\u0440\u0438\u0445\u043e\u0434\u044f\u0449\u0435\u043c\u0443 \u0438\u0437 \u043e\u0447\u0435\u0440\u0435\u0434\u0438   <\/p>\n<\/li>\n<\/ul>\n<pre><code>ParallelFlux dataEventsFlux = reactiveQueue.inboundFlux()         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic())         .map(msg -> {             IndexEvent indexEvent = reactiveQueue.msg2IndexEvent(msg);             try {                 return CreateIndexItem(indexEvent);             } catch (IllegalObjectIdException | IOException | ConvertDataException e) {                 logger.error(\"{} For message: {}\", String.join(\", \",throwable2ListMessage(e)),                         new String(msg.getBody(), StandardCharsets.UTF_8));                 return new IndexItem(null, null, null);             }         })         .filter(e -> nonNull(e.getAction()))         .flatMap(item ->             Flux.zip(\"delete\".equals(item.getAction())                         \/\/ \u0414\u043b\u044f \u043e\u043f\u0435\u0440\u0430\u0446\u0438\u0438 \u0443\u0434\u0430\u043b\u0435\u043d\u0438\u044f \u0441\u043e\u0437\u0434\u0430\u0451\u0442\u0441\u044f Document, \u0441\u043e\u0434\u0435\u0440\u0436\u0430\u0449\u0438\u0439 _id \u0443\u0434\u0430\u043b\u044f\u0435\u043c\u043e\u0433\u043e \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430                     ? Flux.just(new Document().append(\"_id\", item.getIdDocument().get(\"_id\")))                         \/\/ \u0414\u043b\u044f \u043e\u043f\u0435\u0440\u0430\u0446\u0438\u0438 \u043e\u0431\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0430 Document \u0437\u0430\u0433\u0440\u0443\u0436\u0430\u0435\u0442\u0441\u044f \u0438\u0437 \u0431\u0430\u0437\u044b \u0434\u0430\u043d\u043d\u044b\u0445                     : reactorRepositoryMongoDB.find(                         item.getMongoElasticIndex().getCollection(),                         item.getIdDocument(),                         item.getMongoElasticIndex().getProjection()),                 Flux.just(item)             )             .map(d -> new EventDocument(d.getT2().getAction(),                     d.getT1(),                     d.getT2().getMongoElasticIndex()))         );<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434 inboundFlux \u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441\u0430 reactiveQueue \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u043f\u043e\u0442\u043e\u043a \u0434\u043b\u044f \u043e\u0447\u0435\u0440\u0435\u0434\u0438. Parallel \u0438 runOn \u0438\u0434\u0435\u043d\u0442\u0438\u0447\u043d\u044b \u043f\u0440\u0435\u0434\u044b\u0434\u0443\u0449\u0435\u043c\u0443 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0443. \u0414\u0430\u043b\u0435\u0435 \u0441\u043e\u0431\u044b\u0442\u0438\u0435 \u043f\u0440\u0435\u043e\u0431\u0440\u0430\u0437\u0443\u0435\u0442\u0441\u044f \u0438\u0437 JSON \u0432 \u043e\u0431\u044a\u0435\u043a\u0442 IndexEvent, \u043f\u043e \u0441\u043e\u0434\u0435\u0440\u0436\u0438\u043c\u043e\u043c\u0443 \u043a\u043e\u0442\u043e\u0440\u043e\u0433\u043e \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442 \u0438\u0437\u0432\u043b\u0435\u043a\u0430\u044e\u0442\u0441\u044f \u0438\u0437 \u0431\u0430\u0437\u044b, \u0438\u043b\u0438 \u0441\u043e\u0437\u0434\u0430\u0451\u0442\u0441\u044f \u043e\u0431\u044a\u0435\u043a\u0442 \u0434\u043b\u044f \u0443\u0434\u0430\u043b\u0435\u043d\u0438\u044f \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430 \u0438\u0437 ElasticSearch.<\/p>\n<h3>\u0414\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u043a \u0438\u0441\u0445\u043e\u0434\u043d\u043e\u043c\u0443 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0443 \u0438\u0437 \u043f\u0440\u0438\u0441\u043e\u0435\u0434\u0438\u043d\u044f\u0435\u043c\u044b\u0445 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0439   <\/h3>\n<pre><code>private &lt;T> Function&lt;ParallelFlux&lt;T>, ParallelFlux&lt;T>>     joinData(Function&lt;T, Document> getDocument,             Function&lt;T, MongoElasticIndex> getMongoElasticIndex) {     return (ParallelFlux&lt;T> items) ->             items.flatMap(p -> {             if(getDocument.apply(p).size() == 1) {                 return Flux.just(p);             }             return                 Flux.fromIterable(getMongoElasticIndex.apply((T) p).getJoinConditions(getDocument.apply(p)))                         .flatMap(it -> Flux.zip(Flux.just(it.getCollection().getJoinedFieldName()),                                 reactorRepositoryMongoDB.find(getMongoElasticIndex.apply((T) p).getCollection(),                                         it.getCondition(),                                         it.getCollection().getProjection())))                         .reduce(p, (acc, t) -> {                             getDocument.apply(acc).put(t.getT1(), t.getT2());                             return acc;                         });                 }         ); }<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434 joinData \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u044b\u0439 \u043e\u0431\u044a\u0435\u043a\u0442, \u0434\u043e\u0431\u0430\u0432\u043b\u044f\u044e\u0449\u0438\u0439 \u0434\u0430\u043d\u043d\u044b\u0435 \u043a \u0438\u0441\u0445\u043e\u0434\u043d\u043e\u043c\u0443 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0443 \u0438\u0437 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432 \u043f\u0440\u0438\u0441\u043e\u0435\u0434\u0438\u043d\u044f\u0435\u043c\u044b\u0445 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0439. \u0418\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435 flatMap \u0438 Flux.zip \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u0435\u0442 \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u043e \u0437\u0430\u043f\u0443\u0441\u043a\u0430\u0442\u044c \u0438 \u043e\u0431\u0440\u0430\u0431\u0430\u0442\u044b\u0432\u0430\u0442\u044c \u043f\u043e\u0442\u043e\u043a\u0438, \u0432 \u0442\u043e\u043c \u0447\u0438\u0441\u043b\u0435 \u0438 \u043f\u043e\u0442\u043e\u043a\u0438, \u0441\u043e\u0437\u0434\u0430\u0432\u0430\u0435\u043c\u044b\u0435 \u0437\u0430\u043f\u0440\u043e\u0441\u0430\u043c\u0438 \u043a \u0431\u0430\u0437\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 mongodb. \u0412\u0441\u0435 \u0432\u043e\u043f\u0440\u043e\u0441\u044b, \u0441\u0432\u044f\u0437\u0430\u043d\u043d\u044b\u0435 \u0441 \u0441\u0438\u043d\u0445\u0440\u043e\u043d\u0438\u0437\u0430\u0446\u0438\u0435\u0439, \u0431\u0435\u0440\u0435\u0442 \u043d\u0430 \u0441\u0435\u0431\u044f Reactor.<\/p>\n<h3>\u0413\u0435\u043d\u0435\u0440\u0430\u0446\u0438\u044f JSON \u0434\u043b\u044f ElasticSearch<\/h3>\n<pre><code>private &lt;T> Function&lt;ParallelFlux&lt;T>, ParallelFlux&lt;String>>     document2ElasticJson(             Function&lt;T, String> getAction,             Function&lt;T, Document> getDocument,             Function&lt;T, MongoElasticIndex> getMongoElasticIndex) {     return (ParallelFlux&lt;T> items) -> items.map(item -> {         String elasticSend;         try {             Document document = getDocument.apply(item);             MongoElasticIndex mongoElasticIndex = getMongoElasticIndex.apply(item);             elasticSend = \"delete\".equals(getAction.apply(item))                     ? mongoElasticIndex.deleteBuild(document)                     : mongoElasticIndex.indexBuild(document);         } catch (ConvertDataException e) {             throw new RuntimeException(e);         } catch (JsonProcessingException e) {             throw new UncheckedIOException(e);         }         return elasticSend;     }); } <\/code><\/pre>\n<p>\u0418\u0437 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u043d\u043e\u0433\u043e \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430 \u0444\u043e\u0440\u043c\u0438\u0440\u0443\u0435\u0442\u0441\u044f JSON-\u043e\u0431\u044a\u0435\u043a\u0442 \u043c\u043e\u0434\u0438\u0444\u0438\u043a\u0430\u0446\u0438\u0438 \u0438\u043d\u0434\u0435\u043a\u0441\u0430 \u0432 ElasticSearch.  \u041a\u043e\u043d\u0442\u0440\u043e\u043b\u0438\u0440\u0443\u0435\u043c\u044b\u0435 \u0438\u0441\u043a\u043b\u044e\u0447\u0435\u043d\u0438\u044f \u043f\u0440\u0438\u0445\u043e\u0434\u0438\u0442\u0441\u044f \u043a\u043e\u043d\u0432\u0435\u0440\u0442\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0432 \u043d\u0435\u043a\u043e\u043d\u0442\u0440\u043e\u043b\u0438\u0440\u0443\u0435\u043c\u044b\u0435. <\/p>\n<h3>\u0410\u0433\u0440\u0435\u0433\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f _bulk-\u0437\u0430\u043f\u0440\u043e\u0441\u0430<\/h3>\n<pre><code>Function&lt;Flux&lt;String>, Flux&lt;String>> grouping(Task task) {     return (Flux&lt;String> source) -> source             .bufferTimeout(appConfig.getMaxSizeBuffer(),                     Duration.ofMillis(appConfig.getMaxDurationBuffer()))             .doOnNext(p -> task.addDocumentsRead(p.size()))             .map(p -> String.join(\"\\n\", p)             ); }<\/code><\/pre>\n<p>\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435 _bulk-\u0437\u0430\u043f\u0440\u043e\u0441\u0430 \u043a ElasticSearch \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u0435\u0442 \u0441\u0443\u0449\u0435\u0441\u0442\u0432\u0435\u043d\u043d\u043e \u0441\u043d\u0438\u0437\u0438\u0442\u044c \u0442\u0440\u0430\u0444\u0438\u043a \u0438 \u043f\u043e\u0432\u044b\u0441\u0438\u0442\u044c \u043f\u0440\u043e\u0438\u0437\u0432\u043e\u0434\u0438\u0442\u0435\u043b\u044c\u043d\u043e\u0441\u0442\u044c \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438. \u041e\u0431\u044a\u0435\u0434\u0438\u043d\u0435\u043d\u0438\u0435 \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u0435\u043c\u044b\u0445 \u0434\u0430\u043d\u043d\u044b\u0445 \u043d\u0435\u0441\u043b\u043e\u0436\u043d\u043e \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u043f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438  bufferTimeout. \u0417\u043d\u0430\u0447\u0435\u043d\u0438\u044f\u043c\u0438 \u043c\u0430\u043a\u0441\u0438\u043c\u0430\u043b\u044c\u043d\u043e\u0433\u043e \u0440\u0430\u0437\u043c\u0435\u0440\u0430 \u0431\u0443\u0444\u0435\u0440\u0430 \u0438 \u0432\u0440\u0435\u043c\u0435\u043d\u0438 \u043e\u0436\u0438\u0434\u0430\u043d\u0438\u044f \u043c\u043e\u0436\u043d\u043e \u043d\u0430\u0439\u0442\u0438 \u043a\u043e\u043c\u043f\u0440\u043e\u043c\u0438\u0441\u0441 \u043c\u0435\u0436\u0434\u0443 \u043e\u043f\u0435\u0440\u0430\u0442\u0438\u0432\u043d\u043e\u0441\u0442\u044c\u044e \u043e\u0431\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u0434\u0430\u043d\u043d\u044b\u0445 \u0432 ElasticSearch, \u0440\u0430\u0437\u043c\u0435\u0440\u043e\u043c \u0437\u0430\u043f\u0440\u043e\u0441\u0430 \u0438 \u043f\u0440\u043e\u0438\u0437\u0432\u043e\u0434\u0438\u0442\u0435\u043b\u044c\u043d\u043e\u0441\u0442\u044c\u044e.<\/p>\n<h3>\u041e\u0442\u043f\u0440\u0430\u0432\u043a\u0430 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 ElasticSearch   <\/h3>\n<pre><code>public Function&lt;Flux&lt;String>, Flux&lt;Tuple2&lt;String, Document>>> postBulk(Task task) {     return (Flux&lt;String> source) -> source         .flatMap(buffer -> {             if(isNull(task.getMongoElasticIndex())) { \/\/ \u0415\u0441\u043b\u0438 \u0437\u0430\u0434\u0430\u0447\u0430 \u043d\u0435 \u043f\u0435\u0440\u0435\u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u044f                 waitingForResponse.add(buffer);             }             return Flux.zip(Flux.just(buffer),                 webClientElastic.post()                     .uri(\"\/_bulk\")                     .body(BodyInserters.fromValue(buffer))                     .retrieve()                     .onStatus(httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),                             response -> Mono.error(new HttpServiceException(\"System is overloaded\",                                     response.rawStatusCode())))                     .onStatus(httpStatus -> httpStatus.is4xxClientError() &amp;&amp; !httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),                             response -> Mono.error(new RuntimeException(\"API not found\")))                     .onStatus(HttpStatus::is5xxServerError,                             response -> Mono.error(new HttpServiceException(\"Server is not responding\",                                     response.rawStatusCode())))                     .bodyToFlux(Document.class)                     .retryWhen(Retry.backoff(appConfig.getWebClientRetryMaxAttempts(),                                 Duration.ofSeconds(appConfig.getWebClientRetryMinBackoff()))                         .filter(throwable -> throwable instanceof HttpServiceException)                         .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {                             throw new HttpServiceException(\"External Service failed to process after max retries\",                                     HttpStatus.SERVICE_UNAVAILABLE.value());                         }))             );         });  }<\/code><\/pre>\n<p>\u0421\u043e\u0437\u0434\u0430\u0451\u0442\u0441\u044f \u043f\u043e\u0442\u043e\u043a, \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044e\u0449\u0438\u0439 \u0447\u0435\u0440\u0435\u0437 WebClient \u0437\u0430\u043f\u0440\u043e\u0441\u044b \u043a ElasticSearch. \u041f\u043e\u0442\u043e\u043a, \u0444\u043e\u0440\u043c\u0438\u0440\u0443\u0435\u043c\u044b\u0439 \u043c\u0435\u0442\u043e\u0434\u043e\u043c post WebClient\u2019\u0430, Flux.zip \u043e\u0431\u044a\u0435\u0434\u0438\u043d\u044f\u0435\u0442 \u0441 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u043c, \u044d\u0442\u043e \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u0435\u0442 \u043f\u0440\u0438 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0435 \u043e\u0442\u0432\u0435\u0442\u0430 \u0441\u0432\u044f\u0437\u0430\u0442\u044c \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u043d\u044b\u0439 \u043e\u0442\u0432\u0435\u0442 \u0441 \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u0435\u043d\u043d\u044b\u043c \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u043c. \u0421 \u043f\u043e\u043c\u043e\u0449\u044c\u044e retryWhen, Retry.backoff \u043d\u0430\u0441\u0442\u0440\u043e\u0435\u043d\u0430 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043d\u0435\u043a\u043e\u0442\u043e\u0440\u044b\u0445 \u043e\u0448\u0438\u0431\u043e\u043a. <\/p>\n<h3>\u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043e\u0442\u0432\u0435\u0442\u043e\u0432 ElasticSearch   <\/h3>\n<pre><code>private Disposable subscribe(Flux&lt;Tuple2&lt;String,Document>> events, Task task) {     return  events         .subscribe(             p -> {                 if(isNull(task.getMongoElasticIndex())) { \/\/ \u0415\u0441\u043b\u0438 \u0437\u0430\u0434\u0430\u0447\u0430 \u043d\u0435 \u043f\u0435\u0440\u0435\u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u044f                     waitingForResponse.remove(p.getT1());                 }                 int count = Optional.ofNullable(p.getT2().get(\"items\", List.class))                         .map(List::size)                         .orElse(0);                 task.addIndexesWrite(count);             },             e -> {                 if(task != rabbitMQTask)removeTask(task);                 fileStorage.writeCollection2Files(waitingForResponse);                 logger.error(\"Error: {}\", e.getMessage());             }         ); }<\/code><\/pre>\n<p>\u041e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u043e\u0442\u0432\u0435\u0442\u043e\u0432 ElasticSearch \u043c\u0438\u043d\u0438\u043c\u0430\u043b\u044c\u043d\u0430. \u0415\u0441\u043b\u0438 \u043e\u0442\u0432\u0435\u0442 \u043f\u043e\u043b\u0443\u0447\u0435\u043d \u043d\u0430 \u043a\u043e\u043d\u0442\u0440\u043e\u043b\u0438\u0440\u0443\u0435\u043c\u044b\u0439 \u0437\u0430\u043f\u0440\u043e\u0441 (\u043d\u0435 \u0437\u0430\u043f\u0440\u043e\u0441 \u043d\u0430 \u043f\u0435\u0440\u0435\u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u044e), \u0442\u043e \u0437\u0430\u043f\u0440\u043e\u0441 \u0443\u0434\u0430\u043b\u044f\u0435\u0442\u0441\u044f \u0438\u0437 \u043c\u043d\u043e\u0436\u0435\u0441\u0442\u0432\u0430 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432, \u0434\u043b\u044f \u043a\u043e\u0442\u043e\u0440\u044b\u0445 \u043a\u043e\u043d\u0442\u0440\u043e\u043b\u0438\u0440\u0443\u0435\u0442\u0441\u044f \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0435 \u043e\u0442\u0432\u0435\u0442\u0430. \u041e\u0442\u0432\u0435\u0442\u044b \u043d\u0430 \u043f\u0435\u0440\u0435\u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u044e \u043d\u0435 \u043a\u043e\u043d\u0442\u0440\u043e\u043b\u0438\u0440\u0443\u044e\u0442\u0441\u044f. \u0412 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u043d\u043e\u043c \u043e\u0442\u0432\u0435\u0442\u0435 \u0430\u0442\u0440\u0438\u0431\u0443\u0442 items \u0434\u043e\u043b\u0436\u0435\u043d \u0431\u044b\u0442\u044c \u0441\u043f\u0438\u0441\u043a\u043e\u043c, \u0441\u043e\u0434\u0435\u0440\u0436\u0430\u0449\u0438\u043c \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044e \u043e\u0431 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0430\u043d\u043d\u044b\u0445 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430\u0445. \u041d\u0430 \u043a\u043e\u043b\u0438\u0447\u0435\u0441\u0442\u0432\u043e \u044d\u043b\u0435\u043c\u0435\u043d\u0442\u043e\u0432 \u0432 \u0441\u043f\u0438\u0441\u043a\u0435 \u0443\u0432\u0435\u043b\u0438\u0447\u0438\u0432\u0430\u0435\u0442\u0441\u044f \u0441\u0447\u0435\u0442\u0447\u0438\u043a \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0430\u043d\u043d\u044b\u0445 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432.<\/p>\n<h3>\u041d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0430 WebClient   <\/h3>\n<p>\u041e\u0441\u043d\u043e\u0432\u043d\u0430\u044f \u0447\u0430\u0441\u0442\u044c \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 \u0434\u0435\u043b\u0430\u0435\u0442\u0441\u044f \u0432 \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u043e\u043d\u043d\u043e\u043c \u043a\u043b\u0430\u0441\u0441\u0435, \u0431\u0438\u043d \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u043e\u0431\u044a\u0435\u043a\u0442 WebClient.Builder:<\/p>\n<pre><code>@Bean @Qualifier(\"elastic\") public WebClient.Builder webClientWithTimeout() {     final TcpClient tcpClient = TcpClient             .create()             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)             .doOnConnected(connection -> {                 connection.addHandlerLast(new ReadTimeoutHandler(timeout, TimeUnit.MILLISECONDS));                 connection.addHandlerLast(new WriteTimeoutHandler(timeout, TimeUnit.MILLISECONDS));             });      return WebClient.builder()             .baseUrl(baseUrl +\":\" + port.toString())             .filter(basicAuthentication(user, password))             .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); } <\/code><\/pre>\n<p>\u0412 \u043a\u043e\u043d\u0441\u0442\u0440\u0443\u043a\u0442\u043e\u0440\u0435 \u0441\u0435\u0440\u0432\u0438\u0441\u0430 \u0434\u043e\u0431\u0430\u0432\u043b\u044f\u044e\u0442\u0441\u044f \u0444\u0438\u043b\u044c\u0442\u0440\u044b, \u0432\u044b\u0437\u044b\u0432\u0430\u0435\u043c\u044b\u0435 \u043f\u0440\u0438 \u043e\u0442\u043f\u0440\u0430\u0432\u043a\u0435 \u0437\u0430\u043f\u0440\u043e\u0441\u0430 \u0438 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0438 \u043e\u0442\u0432\u0435\u0442\u0430: <\/p>\n<pre><code>this.webClientElastic = webClientElastic         .filter(onRequest())         .filter(onResponse())         .build();<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434\u044b, \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u044e\u0449\u0438\u0435 \u0444\u0438\u043b\u044c\u0442\u0440\u044b:<\/p>\n<pre><code>private ExchangeFilterFunction onRequest() {     return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {         addSendRequest();         int sleepCycleCount = 0;         while (getProcessingRequest() > getMaxProcessingRequest()) {             try {                 logger.info(\"Sleep: {} ProcessingRequest reached {} (MaxProcessingRequest {})\", getSleepOverRequest(),                         getProcessingRequest() - 1, getMaxProcessingRequest());                 sleep(getSleepOverRequest());                 if (sleepCycleCount++ > appConfig.getSleepCycleCountMax()) {                     break;                 }             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         logger.info(\"Request: {} {}\", clientRequest.method(), clientRequest.url());         return Mono.just(clientRequest);     }); }  private ExchangeFilterFunction onResponse() {     return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {         addReceiveResponse();         logger.info(\"Response Status {}\", clientResponse.statusCode());         return Mono.just(clientResponse);     }); } <\/code><\/pre>\n<p>\u0424\u0438\u043b\u044c\u0442\u0440\u044b \u0432\u044b\u0432\u043e\u0434\u044f\u0442 \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044e \u043e\u0431 \u043e\u0442\u043f\u0440\u0430\u0432\u043a\u0435 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432, \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0438 \u043e\u0442\u0432\u0435\u0442\u043e\u0432 \u0438 \u043c\u043e\u0434\u0438\u0444\u0438\u0446\u0438\u0440\u0443\u044e\u0442 \u0441\u0447\u0435\u0442\u0447\u0438\u043a\u0438 \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u0435\u043d\u043d\u044b\u0445 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u0438 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u043d\u044b\u0445 \u043e\u0442\u0432\u0435\u0442\u043e\u0432. \u041f\u0435\u0440\u0435\u0434 \u043e\u0442\u043f\u0440\u0430\u0432\u043a\u043e\u0439 \u0437\u0430\u043f\u0440\u043e\u0441\u0430, \u0435\u0441\u043b\u0438 \u043f\u0440\u0435\u0432\u044b\u0448\u0435\u043d\u043e \u043a\u043e\u043b\u0438\u0447\u0435\u0441\u0442\u0432\u043e \u043d\u0435 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u043d\u044b\u0445 \u043e\u0442\u0432\u0435\u0442\u043e\u0432, \u043f\u0440\u043e\u0446\u0435\u0441\u0441 \u201c\u0437\u0430\u0441\u044b\u043f\u0430\u0435\u0442\u201d \u043d\u0430 \u043d\u0435\u043a\u043e\u0442\u043e\u0440\u043e\u0435 \u0432\u0440\u0435\u043c\u044f.<\/p>\n<h3>\u041d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0430 \u0441\u0440\u0435\u0434\u044b \u0432\u044b\u043f\u043e\u043b\u043d\u0435\u043d\u0438\u044f<\/h3>\n<p>\u0414\u043b\u044f \u0442\u043e\u0433\u043e \u0447\u0442\u043e\u0431\u044b \u0437\u0430\u043f\u0443\u0441\u0442\u0438\u0442\u044c \u044d\u0442\u043e\u0442 \u0441\u0435\u0440\u0432\u0438\u0441 \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u044b: rabbitmq, mongodb \u0438 elasticsearch. \u0412\u0441\u0451 \u044d\u0442\u043e \u043f\u0440\u043e\u0449\u0435 \u0443\u0441\u0442\u0430\u043d\u043e\u0432\u0438\u0442\u044c \u0432 Docker. \u0415\u0449\u0451 \u0432 \u0441\u0430\u043c\u043e\u043c \u043d\u0430\u0447\u0430\u043b\u0435 \u043f\u0440\u043e\u0435\u043a\u0442\u0430 \u0443\u0441\u0442\u0430\u043d\u043e\u0432\u0438\u043b Docker Desktop \u0438 \u043d\u0430\u0441\u0442\u0440\u043e\u0438\u043b \u043a\u043e\u043d\u0442\u0435\u0439\u043d\u0435\u0440\u044b \u0434\u043b\u044f \u0437\u0430\u043f\u0443\u0441\u043a\u0430 \u043d\u0443\u0436\u043d\u044b\u0445 c\u0435\u0440\u0432\u0438\u0441\u043e\u0432. \u041a\u0430\u043a \u044d\u0442\u043e \u0434\u0435\u043b\u0430\u0435\u0442\u0441\u044f \u043c\u043e\u0436\u043d\u043e \u043f\u043e\u0441\u043c\u043e\u0442\u0440\u0435\u0442\u044c, \u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440, \u0432 \u044d\u0442\u043e\u0439 <a href=\"https:\/\/habr.com\/ru\/post\/671344\/\" rel=\"noopener noreferrer nofollow\">\u0441\u0442\u0430\u0442\u044c\u0435<\/a>. \u041f\u043e \u0430\u043d\u0430\u043b\u043e\u0433\u0438\u0438 \u0443\u0441\u0442\u0430\u043d\u043e\u0432\u0438\u043b rabbitmq, mongodb. \u0414\u043e\u0431\u0430\u0432\u0438\u043b \u043a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u043e\u043d\u043d\u044b\u0435 \u0444\u0430\u0439\u043b\u044b \u0438 \u0432\u043d\u0435\u0441 \u0438\u0437\u043c\u0435\u043d\u0435\u043d\u0438\u044f \u0432 \u0444\u0430\u0439\u043b docker-compose.yml. \u041f\u043e\u043b\u0443\u0447\u0438\u0432\u0448\u0438\u0435\u0441\u044f \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438 Docker \u043c\u043e\u0436\u043d\u043e \u043d\u0430\u0439\u0442\u0438 \u0432 \u043f\u0430\u043f\u043a\u0435 \u043f\u0440\u043e\u0435\u043a\u0442\u0430 docker-elk. \u041d\u0438\u0436\u0435 \u0441\u043a\u0440\u0438\u043d\u0448\u043e\u0442 \u0437\u0430\u043f\u0443\u0449\u0435\u043d\u043d\u043e\u0433\u043e \u043a\u043e\u043d\u0442\u0435\u0439\u043d\u0435\u0440\u0430:<\/p>\n<figure class=\"full-width\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w1560\/getpro\/habr\/upload_files\/f57\/64d\/455\/f5764d4552940dbd209f971ffd782029.png\" width=\"974\" height=\"553\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/upload_files\/f57\/64d\/455\/f5764d4552940dbd209f971ffd782029.png\"\/><figcaption><\/figcaption><\/figure>\n<h3>\u0417\u0430\u043f\u0443\u0441\u043a \u0441\u0435\u0440\u0432\u0438\u0441\u0430<\/h3>\n<p>\u0414\u043b\u044f \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f \u0437\u0430\u0433\u0440\u0443\u0437\u0438\u043b \u0432 \u0431\u0430\u0437\u0443 mongodb 1000 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432.  \u0418\u0437 Postman \u0438 \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044e \u0437\u0430\u043f\u0440\u043e\u0441: <\/p>\n<figure class=\"full-width\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w1560\/getpro\/habr\/upload_files\/7ea\/e2d\/3e4\/7eae2d3e452a38bf88dd5c23b6279fbd.png\" width=\"974\" height=\"478\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/upload_files\/7ea\/e2d\/3e4\/7eae2d3e452a38bf88dd5c23b6279fbd.png\"\/><figcaption><\/figcaption><\/figure>\n<p>\u0412 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u043d\u043e\u043c \u043b\u043e\u0433\u0435 \u0432\u0438\u0434\u043d\u043e, \u0447\u0442\u043e \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0432\u044b\u043f\u043e\u043b\u043d\u044f\u0435\u0442\u0441\u044f \u0432 \u0440\u0430\u0437\u043d\u044b\u0445 \u043f\u043e\u0442\u043e\u043a\u0430\u0445:<\/p>\n<pre><code>2022-11-02 15:23:17.396  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.418  INFO 8336 --- [      Thread-42] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.447  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.462  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.475  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.477  INFO 8336 --- [      Thread-33] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:855, serverValue:83}] to localhost:27017 2022-11-02 15:23:17.484  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.485  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.489  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.493  INFO 8336 --- [       Thread-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:856, serverValue:84}] to localhost:27017 2022-11-02 15:23:17.566  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.588  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.622  INFO 8336 --- [       Thread-4] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.651  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.651  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.658  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.658  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.673  INFO 8336 --- [      Thread-52] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:940, serverValue:85}] to localhost:27017 2022-11-02 15:23:17.676  INFO 8336 --- [       Thread-7] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:941, serverValue:86}] to localhost:27017 2022-11-02 15:23:17.715  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.738  INFO 8336 --- [       Thread-5] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.748  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.772  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.790  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.811  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.818  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.849  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:993, serverValue:87}] to localhost:27017 2022-11-02 15:23:17.851  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:994, serverValue:88}] to localhost:27017 2022-11-02 15:23:17.895  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.898  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:17.911  INFO 8336 --- [       Thread-6] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.926  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.944  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.966  INFO 8336 --- [      Thread-36] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:17.993  INFO 8336 --- [      Thread-36] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1078, serverValue:90}] to localhost:27017 2022-11-02 15:23:18.002  INFO 8336 --- [      Thread-48] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1079, serverValue:89}] to localhost:27017 2022-11-02 15:23:18.041  INFO 8336 --- [      Thread-34] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.044  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.059  INFO 8336 --- [      Thread-31] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:18.076  INFO 8336 --- [       Thread-7] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:18.083  INFO 8336 --- [ctor-http-nio-1] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.096  INFO 8336 --- [ctor-http-nio-2] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.135  INFO 8336 --- [      Thread-34] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1156, serverValue:92}] to localhost:27017 2022-11-02 15:23:18.138  INFO 8336 --- [      Thread-39] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1155, serverValue:91}] to localhost:27017 2022-11-02 15:23:18.140  INFO 8336 --- [      Thread-39] ru.mvz.elasticsearch.service.Indexer     : Request: POST http:\/\/localhost:9200\/_bulk 2022-11-02 15:23:18.180  INFO 8336 --- [ctor-http-nio-3] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.180  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Response Status 200 OK 2022-11-02 15:23:18.181  INFO 8336 --- [ctor-http-nio-4] ru.mvz.elasticsearch.service.Indexer     : Start: 2022-11-02 15:23:17.250 End: 2022-11-02 15:23:18.181 read 1000 write 1000<\/code><\/pre>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043f\u0440\u043e\u0432\u0435\u0440\u0438\u043c, \u0447\u0442\u043e \u0437\u0430\u0433\u0440\u0443\u0437\u0438\u043b\u043e\u0441\u044c.<\/p>\n<p>\u0417\u0430\u043f\u0440\u043e\u0441 \u043a ElasticSearch \u043f\u043e\u043a\u0430\u0437\u044b\u0432\u0430\u0435\u0442 \u043d\u0430\u043b\u0438\u0447\u0438\u0435 \u0438\u043d\u0434\u0435\u043a\u0441\u0430 \u0441 1000 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430\u043c\u0438:<\/p>\n<figure class=\"full-width\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w1560\/getpro\/habr\/upload_files\/7bf\/352\/2b3\/7bf3522b36293bb15f5eea4c4e7dbf5c.png\" width=\"974\" height=\"171\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/upload_files\/7bf\/352\/2b3\/7bf3522b36293bb15f5eea4c4e7dbf5c.png\"\/><figcaption><\/figcaption><\/figure>\n<p>\u0418 \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0435\u043c \u043d\u0430\u0439\u0442\u0438 \u0447\u0442\u043e-\u0442\u043e \u0432 ElasticSearch: <\/p>\n<figure class=\"full-width\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w1560\/getpro\/habr\/upload_files\/7c2\/137\/6ad\/7c21376ada6c698bd0f87c8ef34cc505.png\" width=\"974\" height=\"626\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/upload_files\/7c2\/137\/6ad\/7c21376ada6c698bd0f87c8ef34cc505.png\"\/><figcaption><\/figcaption><\/figure>\n<p>\u041f\u043e\u043b\u0443\u0447\u0435\u043d \u043e\u0442\u0432\u0435\u0442 ElasticSearch \u0441 \u043d\u0430\u0439\u0434\u0435\u043d\u043d\u044b\u043c \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u043c!<\/p>\n<h2>\u0417\u0430\u043a\u043b\u044e\u0447\u0435\u043d\u0438\u0435<\/h2>\n<p>\u0412 \u044d\u0442\u043e\u043c \u043c\u0430\u0442\u0435\u0440\u0438\u0430\u043b\u0435 \u043c\u043d\u0435 \u0445\u043e\u0442\u0435\u043b\u043e\u0441\u044c \u043f\u0440\u0438\u0432\u0435\u0441\u0442\u0438 \u043f\u0440\u0438\u043c\u0435\u0440 \u0441\u0435\u0440\u0432\u0438\u0441\u0430, \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u043d\u043d\u043e\u0433\u043e \u0441 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435\u043c Spring Boot, WebFlux, WebClient, Reactor &#8212; \u043d\u0430\u0434\u0435\u044e\u0441\u044c, \u0447\u0442\u043e \u0443 \u043c\u0435\u043d\u044f \u044d\u0442\u043e \u043f\u043e\u043b\u0443\u0447\u0438\u043b\u043e\u0441\u044c. <\/p>\n<p>\u041d\u0435\u0441\u043a\u043e\u043b\u044c\u043a\u043e \u0432\u044b\u0432\u043e\u0434\u043e\u0432:<\/p>\n<ol>\n<li>\n<p>\u0420\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u044b\u0435 \u0444\u0440\u0435\u0439\u043c\u0432\u043e\u0440\u043a\u0438, \u0432 \u0442\u043e\u043c \u0447\u0438\u0441\u043b\u0435 \u0438 Reactor, \u0434\u0435\u043b\u0430\u0435\u0442 \u0437\u0430 \u043d\u0430\u0441 \u0441\u0443\u0449\u0435\u0441\u0442\u0432\u0435\u043d\u043d\u0443\u044e \u0447\u0430\u0441\u0442\u044c \u0440\u0430\u0431\u043e\u0442\u044b \u043f\u043e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u044b\u0445 \u043c\u043d\u043e\u0433\u043e\u043f\u043e\u0442\u043e\u0447\u043d\u044b\u0445 \u0430\u043b\u0433\u043e\u0440\u0438\u0442\u043c\u043e\u0432, \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u044f \u0441\u043e\u0441\u0440\u0435\u0434\u043e\u0442\u043e\u0447\u0438\u0442\u044c\u0441\u044f \u043d\u0430 \u043f\u0440\u0435\u0434\u043c\u0435\u0442\u043d\u043e\u0439 \u043e\u0431\u043b\u0430\u0441\u0442\u0438.<\/p>\n<\/li>\n<li>\n<p>\u0421 \u0438\u0445 \u043f\u043e\u043c\u043e\u0449\u044c\u044e \u043c\u043e\u0436\u043d\u043e, \u0434\u043e\u0441\u0442\u0430\u0442\u043e\u0447\u043d\u043e \u043f\u0440\u043e\u0441\u0442\u043e, \u0441\u043e\u0437\u0434\u0430\u0432\u0430\u0442\u044c \u0432\u044b\u0441\u043e\u043a\u043e\u043d\u0430\u0433\u0440\u0443\u0436\u0435\u043d\u043d\u044b\u0435 \u0441\u0435\u0440\u0432\u0438\u0441\u044b.<\/p>\n<\/li>\n<li>\n<p>\u0414\u043b\u044f \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u044f \u043c\u0430\u043a\u0441\u0438\u043c\u0430\u043b\u044c\u043d\u043e\u0433\u043e \u044d\u0444\u0444\u0435\u043a\u0442\u0430 \u043e\u0442 \u043f\u0435\u0440\u0435\u0445\u043e\u0434\u0430 \u043d\u0430 \u0440\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u044b\u0435 \u043f\u0430\u0442\u0442\u0435\u0440\u043d\u044b \u043f\u0440\u043e\u0433\u0440\u0430\u043c\u043c\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u043d\u0443\u0436\u043d\u043e \u0447\u0442\u043e\u0431\u044b \u0432\u0441\u044f \u0446\u0435\u043f\u043e\u0447\u043a\u0430 \u0432\u044b\u0447\u0438\u0441\u043b\u0435\u043d\u0438\u0439 \u0431\u044b\u043b\u0430 \u0440\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u043e\u0439, \u043d\u0430\u0447\u0438\u043d\u0430\u044f \u0441 \u0434\u0440\u0430\u0439\u0432\u0435\u0440\u043e\u0432 \u0434\u043e\u0441\u0442\u0443\u043f\u0430 \u043a \u0431\u0430\u0437\u0430\u043c \u0434\u0430\u043d\u043d\u044b\u0445, \u043e\u0447\u0435\u0440\u0435\u0434\u044f\u043c, \u0444\u0430\u0439\u043b\u0430\u043c \u0438 \u0442.\u0434.<\/p>\n<\/li>\n<\/ol>\n<p>\u0415\u0449\u0451 \u0440\u0430\u0437, \u0440\u0435\u043f\u043e\u0437\u0438\u0442\u043e\u0440\u0438\u0439 \u0441 \u043a\u043e\u0434\u043e\u043c \u0438 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0430\u043c\u0438 \u043d\u0430\u0445\u043e\u0434\u0438\u0442\u0441\u044f <a href=\"https:\/\/github.com\/ValentinMorozov\/elastic-search\" rel=\"noopener noreferrer nofollow\">\u0437\u0434\u0435\u0441\u044c<\/a><\/p>\n<p>\u041d\u0435\u0441\u043a\u043e\u043b\u044c\u043a\u043e \u0441\u0441\u044b\u043b\u043e\u043a \u043d\u0430 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c\u044b\u0435 \u043c\u0430\u0442\u0435\u0440\u0438\u0430\u043b\u044b:<\/p>\n<p><a href=\"https:\/\/habr.com\/ru\/company\/otus\/blog\/541404\/\" rel=\"noopener noreferrer nofollow\">\u0428\u043f\u0430\u0440\u0433\u0430\u043b\u043a\u0430 \u043f\u043e Spring Boot WebClient<\/a><\/p>\n<p><a href=\"https:\/\/medium.com\/@kirill.sereda\/reactive-programming-reactor-%D0%B8-spring-webflux-%D1%87%D0%B0%D1%81%D1%82%D1%8C-2-a0273a5d4ebd\" rel=\"noopener noreferrer nofollow\"><u>Reactive Programming: Reactor \u0438 Spring WebFlux \u2014 \u0447\u0430\u0441\u0442\u044c 2<\/u><\/a><\/p>\n<p>\u0420\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u043e\u0435\u00a0<a href=\"https:\/\/habr.com\/ru\/post\/565050\/\" rel=\"noopener noreferrer nofollow\">\u043f\u0440\u043e\u0433\u0440\u0430\u043c\u043c\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435<\/a> \u0441\u043e Spring, \u0447\u0430\u0441\u0442\u044c\u00a02 Project Reactor<\/p>\n<\/p>\n<\/div>\n<\/div>\n<\/div>\n<div class=\"v-portal\" style=\"display:none;\"><\/div>\n<\/div>\n<p> <!----> <!----><br \/> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/post\/699112\/\"> https:\/\/habr.com\/ru\/post\/699112\/<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<div><\/div>\n<div id=\"post-content-body\">\n<div>\n<div class=\"article-formatted-body article-formatted-body article-formatted-body_version-2\">\n<div xmlns=\"http:\/\/www.w3.org\/1999\/xhtml\">\n<h2>\u0426\u0435\u043b\u044c\/\u0432\u0432\u0435\u0434\u0435\u043d\u0438\u0435<\/h2>\n<p>\u0420\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u044b\u0435 \u043f\u0430\u0442\u0442\u0435\u0440\u043d\u044b \u043f\u0440\u043e\u0433\u0440\u0430\u043c\u043c\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f \u0441\u0442\u0430\u043d\u043e\u0432\u044f\u0442\u0441\u044f \u0432\u0441\u0451 \u0431\u043e\u043b\u0435\u0435 \u0432\u043e\u0441\u0442\u0440\u0435\u0431\u043e\u0432\u0430\u043d\u044b \u043f\u0440\u0438 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0432\u044b\u0441\u043e\u043a\u043e\u043d\u0430\u0433\u0440\u0443\u0436\u0435\u043d\u043d\u044b\u0445 \u0441\u0435\u0440\u0432\u0438\u0441\u043e\u0432. \u0420\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u044b\u0435 \u0444\u0440\u0435\u0439\u043c\u0432\u043e\u0440\u043a\u0438 \u043f\u0440\u0435\u0434\u043e\u0441\u0442\u0430\u0432\u043b\u044f\u044e\u0442 \u0438\u043d\u0441\u0442\u0440\u0443\u043c\u0435\u043d\u0442\u044b, \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u044e\u0449\u0438\u0435 \u0441 \u043c\u0438\u043d\u0438\u043c\u0430\u043b\u044c\u043d\u044b\u043c\u0438 \u0437\u0430\u0442\u0440\u0430\u0442\u0430\u043c\u0438 \u043d\u0430 \u043a\u043e\u0434\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u043c\u0435\u0445\u0430\u043d\u0438\u0437\u043c\u044b \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u043e\u0441\u0442\u0438 \u0438 \u043c\u043d\u043e\u0433\u043e\u043f\u043e\u0442\u043e\u0447\u043d\u043e\u0441\u0442\u0438.<\/p>\n<p>\u0412 \u043a\u0430\u0447\u0435\u0441\u0442\u0432\u0435 \u043f\u0440\u0438\u043c\u0435\u0440\u0430, \u043f\u0440\u0435\u0434\u043b\u0430\u0433\u0430\u044e \u0440\u0430\u0441\u0441\u043c\u043e\u0442\u0440\u0435\u0442\u044c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0441\u0435\u0440\u0432\u0438\u0441 \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438 \u0434\u0430\u043d\u043d\u044b\u0445 \u0432 ElasticSearch. \u0414\u0430\u043d\u043d\u044b\u0435 \u0445\u0440\u0430\u043d\u044f\u0442\u0441\u044f \u0432 MongoDB, \u043a\u043b\u044e\u0447\u0435\u0432\u044b\u0435 \u0430\u0442\u0440\u0438\u0431\u0443\u0442\u044b \u043a\u043e\u0442\u043e\u0440\u044b\u0445 \u0441\u0438\u043d\u0445\u0440\u043e\u043d\u0438\u0437\u0438\u0440\u0443\u044e\u0442\u0441\u044f \u0441 ElasticSearch (\u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u043e \u043f\u043e\u0445\u043e\u0436\u0435 \u043d\u0430 Logstash). \u0412 \u043f\u0440\u043e\u0435\u043a\u0442\u0435 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442\u0441\u044f \u0441\u0442\u0435\u043a: Java\/Spring Boot\/Reactor\/WebFlux\/WebClient\/RabbitMQ\/MongoDB. \u041d\u0430 \u0432\u044b\u0431\u043e\u0440 RabbitMQ \u0438 MongoDB \u043f\u043e\u0432\u043b\u0438\u044f\u043b\u043e, \u0432 \u0442\u043e\u043c \u0447\u0438\u0441\u043b\u0435, \u043d\u0430\u043b\u0438\u0447\u0438\u0435 \u0440\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u044b\u0445 \u0434\u0440\u0430\u0439\u0432\u0435\u0440\u043e\u0432.<\/p>\n<h3>\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0437\u0430\u0434\u0430\u0447\u0438<\/h3>\n<ol>\n<li>\n<p>\u0421\u0435\u0440\u0432\u0438\u0441 \u0434\u043e\u043b\u0436\u0435\u043d \u043f\u0440\u0438\u043d\u0438\u043c\u0430\u0442\u044c \u043f\u043e\u0442\u043e\u043a \u0434\u0430\u043d\u043d\u044b\u0445 \u0438\u0437 \u043e\u0447\u0435\u0440\u0435\u0434\u0438, \u0432\u044b\u0431\u0438\u0440\u0430\u0442\u044c \u0441\u0432\u044f\u0437\u0430\u043d\u043d\u044b\u0435 \u0434\u0430\u043d\u043d\u044b\u0435 \u0438\u0437 \u0431\u0430\u0437\u044b \u0438 \u043f\u0435\u0440\u0435\u0434\u0430\u0432\u0430\u0442\u044c \u0438\u0445 ElasticSearch. \u0424\u043e\u0440\u043c\u0430\u0442 \u0434\u0430\u043d\u043d\u044b\u0445 \u043e\u0447\u0435\u0440\u0435\u0434\u0438: \u0434\u0435\u0439\u0441\u0442\u0432\u0438\u0435 (index\/delete); id \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430; \u0438\u043c\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0430; \u0442\u0438\u043f \u0438\u043d\u0434\u0435\u043a\u0441\u0430 (\u043e\u043f\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u043e).<\/p>\n<\/li>\n<li>\n<p>\u0427\u0435\u0440\u0435\u0437 web-\u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441 \u0434\u043e\u043b\u0436\u0435\u043d \u0431\u044b\u0442\u044c \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u043d \u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b \u0434\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u044f, \u0443\u0434\u0430\u043b\u0435\u043d\u0438\u044f \u0438 \u043f\u0435\u0440\u0435\u0441\u0442\u0440\u043e\u0435\u043d\u0438\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0430.<\/p>\n<\/li>\n<li>\n<p>\u0414\u043e\u043b\u0436\u043d\u0430 \u0431\u044b\u0442\u044c \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044c \u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f \u0430\u0433\u0440\u0435\u0433\u0438\u0440\u043e\u0432\u0430\u043d\u043d\u044b\u0445 \u043f\u043e\u043b\u0435\u0439, \u0441\u043e\u0434\u0435\u0440\u0436\u0430\u0449\u0438\u0445 \u0434\u0430\u043d\u043d\u044b\u0435 \u0438\u0437 \u043d\u0435\u0441\u043a\u043e\u043b\u044c\u043a\u0438\u0445 \u0438\u0441\u0445\u043e\u0434\u043d\u044b\u0445 \u043f\u043e\u043b\u0435\u0439 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432, \u0438 \u0434\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u0432 \u0438\u043d\u0434\u0435\u043a\u0441 \u0438\u0437 \u0441\u0432\u044f\u0437\u0430\u043d\u043d\u044b\u0445 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0439.<\/p>\n<\/li>\n<li>\n<p>\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0438\u043d\u0434\u0435\u043a\u0441\u0438\u0440\u0443\u0435\u043c\u044b\u0445 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043e\u043b\u0436\u043d\u043e \u0431\u044b\u0442\u044c \u0432 \u0444\u043e\u0440\u043c\u0430\u0442\u0435 JSON.<\/p>\n<\/li>\n<\/ol>\n<h2>DFD-\u0434\u0438\u0430\u0433\u0440\u0430\u043c\u043c\u0430 \u043f\u0440\u043e\u0446\u0435\u0441\u0441\u0430 \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438<\/h2>\n<p>\u0421\u0445\u0435\u043c\u0430 \u043f\u0440\u043e\u0446\u0435\u0441\u0441\u0430 \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432, \u043f\u043e\u0441\u0442\u0443\u043f\u0430\u044e\u0449\u0438\u0445 \u0438\u0437 \u043e\u0447\u0435\u0440\u0435\u0434\u0438, \u0432\u044b\u0433\u043b\u044f\u0434\u0438\u0442 \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u043c \u043e\u0431\u0440\u0430\u0437\u043e\u043c:<\/p>\n<figure class=\"full-width\"><figcaption><\/figcaption><\/figure>\n<p>\u0410\u043b\u0433\u043e\u0440\u0438\u0442\u043c \u043f\u0435\u0440\u0435\u0441\u0442\u0440\u043e\u0435\u043d\u0438\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0430 \u0432\u044b\u0433\u043b\u044f\u0434\u0438\u0442 \u043f\u0440\u0430\u043a\u0442\u0438\u0447\u0435\u0441\u043a\u0438 \u0442\u0430\u043a\u0436\u0435, \u0437\u0430 \u0438\u0441\u043a\u043b\u044e\u0447\u0435\u043d\u0438\u0435\u043c \u0442\u043e\u0433\u043e, \u0447\u0442\u043e \u0432 \u043d\u0435\u043c \u043e\u0442\u0441\u0443\u0442\u0441\u0442\u0432\u0443\u0435\u0442 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432, \u043e\u0442\u043b\u043e\u0436\u0435\u043d\u043d\u044b\u0445 \u0438\u0437-\u0437\u0430 \u043e\u0448\u0438\u0431\u043e\u043a.<\/p>\n<h2>\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u0430<\/h2>\n<p>\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u0430 \u043a\u043e\u0441\u043d\u0435\u0442\u0441\u044f \u0442\u043e\u043b\u044c\u043a\u043e \u0440\u0430\u0431\u043e\u0442\u044b \u0440\u0435\u0430\u043a\u0442\u0438\u0432\u043d\u043e\u0439 \u0447\u0430\u0441\u0442\u0438 \u0441\u0435\u0440\u0432\u0438\u0441\u0430. \u041a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u043e\u043d\u043d\u044b\u0435 \u043d\u0430\u0441\u0442\u0440\u043e\u0439\u043a\u0438, \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0430 \u0444\u043e\u0440\u043c\u0430\u0442\u0430 \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0438\u0440\u0443\u0435\u043c\u044b\u0445 \u0434\u0430\u043d\u043d\u044b\u0445, \u0444\u043e\u0440\u043c\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u043a ElasticSearch \u0432\u044b\u043d\u0435\u0441\u0435\u043d\u044b \u0437\u0430 \u0440\u0430\u043c\u043a\u0438 \u0434\u0430\u043d\u043d\u043e\u0439 \u0441\u0442\u0430\u0442\u044c\u0438, \u043d\u043e \u0432\u044b \u043c\u043e\u0436\u0435\u0442\u0435 \u043f\u043e\u0441\u043c\u043e\u0442\u0440\u0435\u0442\u044c \u043a\u043e\u0434 \u043d\u0430 GitHub, \u043f\u043e <a href=\"https:\/\/github.com\/ValentinMorozov\/elastic-search\" rel=\"noopener noreferrer nofollow\">\u0441\u0441\u044b\u043b\u043a\u0435<\/a>.<\/p>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0435\u043c \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u0442\u044c \u044d\u0442\u0443 \u0441\u0445\u0435\u043c\u0443 \u0441\u043a\u0432\u043e\u0437\u043d\u044b\u043c \u043f\u043e\u0442\u043e\u043a\u043e\u043c Reactor, \u043d\u0435 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044f \u043f\u043e\u0434\u043f\u0438\u0441\u043a\u0438 \u043d\u0430 \u043e\u0442\u0434\u0435\u043b\u044c\u043d\u044b\u0435 \u044d\u043b\u0435\u043c\u0435\u043d\u0442\u044b, \u0432 \u0442\u043e\u043c \u0447\u0438\u0441\u043b\u0435 \u043e\u0442\u043f\u0440\u0430\u0432\u043a\u0443 \u0447\u0435\u0440\u0435\u0437 WebClient HTTP-\u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u0438 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0443 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u043d\u044b\u0445 \u043e\u0442\u0432\u0435\u0442\u043e\u0432. \u041e\u0442\u0434\u0430\u0434\u0438\u043c, \u043f\u043e\u0447\u0442\u0438 \u043f\u043e\u043b\u043d\u043e\u0441\u0442\u044c\u044e, \u0441\u0438\u043d\u0445\u0440\u043e\u043d\u0438\u0437\u0430\u0446\u0438\u044e \u0432\u044b\u043f\u043e\u043b\u043d\u0435\u043d\u0438\u044f Reactor. <\/p>\n<p>\u041a\u043e\u0434, \u0437\u0430\u043f\u0443\u0441\u043a\u0430\u044e\u0449\u0438\u0439 \u043f\u0440\u043e\u0446\u0435\u0441\u0441 \u043f\u0435\u0440\u0435\u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438 \u0432\u044b\u0433\u043b\u044f\u0434\u0438\u0442 \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u043c \u043e\u0431\u0440\u0430\u0437\u043e\u043c: <\/p>\n<pre><code>Task task = new Task(mongoElasticIndex); ParallelFlux dataEventsFlux = reactorRepositoryMongoDB         .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic()); Flux&lt;Tuple2&lt;String,Document>> processingData = processingData(dataEventsFlux, (p) -> \"index\",         (p) -> (Document)p,         (p) -> mongoElasticIndex,         Flux.just(),         task);  task.setDispose(subscribe(processedData, task));<\/code><\/pre>\n<p>\u041f\u043e\u043b\u0443\u0447\u0430\u0435\u043c \u043f\u043e\u0442\u043e\u043a \u0434\u0430\u043d\u043d\u044b\u0445 \u0438\u0437 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0438, \u043d\u0430\u0441\u0442\u0440\u0430\u0438\u0432\u0430\u0435\u043c \u043f\u0430\u0440\u0430\u043b\u043b\u0435\u043b\u0438\u0437\u043c,  \u0444\u043e\u0440\u043c\u0438\u0440\u0443\u0435\u043c \u043e\u0431\u044a\u0435\u043a\u0442 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438 \u043f\u043e\u0442\u043e\u043a\u0430 \u0438 \u043f\u043e\u0434\u043f\u0438\u0441\u044b\u0432\u0430\u0435\u043c\u0441\u044f \u043d\u0430 \u043f\u043e\u0442\u043e\u043a. \u0417\u0434\u0435\u0441\u044c \u043a\u043b\u0430\u0441\u0441 Task \u2013 \u0432\u043d\u0443\u0442\u0440\u0435\u043d\u043d\u0438\u0439 \u043a\u043b\u0430\u0441\u0441, \u043d\u0430\u0437\u043d\u0430\u0447\u0435\u043d\u0438\u0435 \u043a\u043e\u0442\u043e\u0440\u043e\u0433\u043e: \u0441\u043e\u0431\u0438\u0440\u0430\u0442\u044c \u0441\u0442\u0430\u0442\u0438\u0441\u0442\u0438\u043a\u0443 \u0438 \u043f\u0440\u0435\u0434\u043e\u0441\u0442\u0430\u0432\u043b\u044f\u0442\u044c \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044e \u043e \u0432\u044b\u043f\u043e\u043b\u043d\u044f\u0435\u043c\u044b\u0445 \u0437\u0430\u0434\u0430\u0447\u0430\u0445 \u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u0438.<\/p>\n<p>\u041c\u0435\u0442\u043e\u0434 processingData \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u043f\u043e\u0442\u043e\u043a \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u0438 \u043e\u0442\u0432\u0435\u0442\u043e\u0432, \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u0435\u043d\u043d\u044b\u0445 WebClient\u2019\u043e\u043c:<\/p>\n<pre><code>private &lt;T> Flux&lt;Tuple2&lt;String,Document>>     processingData(ParallelFlux&lt;T> events,             Function&lt;T, String> getAction,                    Function&lt;T, Document> getDocument,                    Function&lt;T, MongoElasticIndex> getMongoElasticIndex,                    Flux&lt;String> mergeFlux,                    Task task) {     return  events             \/\/ \u0414\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u043a \u0438\u0441\u0445\u043e\u0434\u043d\u043e\u043c\u0443 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0443 \u0438\u0437 \u043f\u0440\u0438\u0441\u043e\u0435\u0434\u0438\u043d\u044f\u0435\u043c\u044b\u0445 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0439         .transform(joinData(getDocument, getMongoElasticIndex))             \/\/ \u0413\u0435\u043d\u0435\u0440\u0430\u0446\u0438\u044f \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f \u043f\u0435\u0440\u0435\u0434\u0430\u0447\u0438 \u0432 ElasticSearch         .transform(document2ElasticJson(getAction, getDocument, getMongoElasticIndex))         .sequential()             \/\/ \u0410\u0433\u0440\u0435\u0433\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f _bulk         .transform(grouping(task))             \/\/ \u0414\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u043f\u043e\u0442\u043e\u043a\u0430 \u0434\u0430\u043d\u043d\u044b\u0445, \u043d\u0430 \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u043d\u0435 \u043f\u043e\u043b\u0443\u0447\u0435\u043d \u043e\u0442\u0432\u0435\u0442 \u043e\u0442 ElasticSearch         .mergeWith(mergeFlux)             \/\/ \u041e\u0442\u043f\u0440\u0430\u0432\u043a\u0430 \u0437\u0430\u043f\u0440\u043e\u0441\u043e\u0432 \u0432 ElasticSearch         .transform(postBulk(task))         .subscribeOn(Schedulers.single())         .doOnNext(testAliveResponses(task))         .doOnSubscribe(p-> p.request(appConfig.getMaxSizeBuffer() * 2))         .doOnComplete(() -> { logger.info(\"Start: {} End: {} read {} write {}\",                 formatDate(task.getStartDate()),                 formatDate(new Date()),                 task.getDocumentsRead(),                 task.getIndexesWrite(), getMaxProcessingRequest());             fileStorage.writeCollection2Files(waitingForResponse);             removeTask(task);         }); }<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434\u043e\u043c transform Reactor \u0441\u043e\u0435\u0434\u0438\u043d\u044f\u0435\u043c \u043e\u0442\u0434\u0435\u043b\u044c\u043d\u044b\u0435 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0447\u0438\u043a\u0438 \u043f\u043e\u0442\u043e\u043a\u043e\u0432. \u0417\u0434\u0435\u0441\u044c \u0435\u0441\u0442\u044c \u043e\u0434\u043d\u043e \u0441\u0443\u0449\u0435\u0441\u0442\u0432\u0435\u043d\u043d\u043e\u0435 \u043e\u0433\u0440\u0430\u043d\u0438\u0447\u0435\u043d\u0438\u0435: \u0432\u0445\u043e\u0434\u044f\u0449\u0438\u0439 \u0438 \u0438\u0441\u0445\u043e\u0434\u044f\u0449\u0438\u0439 \u043f\u043e\u0442\u043e\u043a\u0438 \u0434\u043e\u043b\u0436\u043d\u044b \u0431\u044b\u0442\u044c \u043e\u0434\u043d\u043e\u0442\u0438\u043f\u043d\u044b\u043c\u0438 (Flux \u0438\u043b\u0438 ParallelFlux). \u041d\u0435\u043b\u044c\u0437\u044f, \u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440, \u0441 \u043f\u043e\u043c\u043e\u0449\u044c\u044e transform \u0432\u0441\u0442\u0440\u043e\u0438\u0442\u044c \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0447\u0438\u043a \u0443 \u043a\u043e\u0442\u043e\u0440\u043e\u0433\u043e \u0432\u0445\u043e\u0434 Flux, \u0430 \u0432\u044b\u0445\u043e\u0434 ParallelFlux.<\/p>\n<p>\u0412 \u043c\u0435\u0442\u043e\u0434 subscribe \u0441\u0435\u0440\u0432\u0438\u0441\u0430 \u0438\u043d\u043a\u0430\u043f\u0441\u0443\u043b\u0438\u0440\u043e\u0432\u0430\u043d\u0430 \u043f\u043e\u0434\u043f\u0438\u0441\u043a\u0430 \u043d\u0430 \u043f\u043e\u0442\u043e\u043a. \u041d\u0438\u0436\u0435 \u043f\u0440\u0438\u0432\u0435\u0434\u0435\u043d\u0430 \u0435\u0433\u043e \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f:<\/p>\n<pre><code>private Disposable subscribe(Flux&lt;Tuple2&lt;String,Document>> events, Task task) {     return  events         .subscribe(             p -> {                 if(isNull(task.getMongoElasticIndex())) { \/\/ \u0415\u0441\u043b\u0438 \u0437\u0430\u0434\u0430\u0447\u0430 \u043d\u0435 \u043f\u0435\u0440\u0435\u0438\u043d\u0434\u0435\u043a\u0441\u0430\u0446\u0438\u044f                     waitingForResponse.remove(p.getT1());                 }                 int count = Optional.ofNullable(p.getT2().get(\"items\", List.class))                         .map(List::size)                         .orElse(0);                 task.addIndexesWrite(count);             },             e -> {                 if(task != rabbitMQTask)removeTask(task);                 fileStorage.writeCollection2Files(waitingForResponse);                 logger.error(\"Error: {}\", e.getMessage());             }         ); }<\/code><\/pre>\n<p>\u0414\u0430\u043b\u0435\u0435 \u043a\u043e\u0440\u043e\u0442\u043a\u043e \u043e\u0431 \u043e\u0442\u0434\u0435\u043b\u044c\u043d\u044b\u0445 \u0444\u0443\u043d\u043a\u0446\u0438\u044f\u0445 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438 \u043f\u043e\u0442\u043e\u043a\u0430.<\/p>\n<h3>\u0417\u0430\u0433\u0440\u0443\u0437\u043a\u0430 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432<\/h3>\n<p>\u0418\u043c\u0435\u044e\u0442\u0441\u044f \u0434\u0432\u0430 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0430 \u0437\u0430\u0433\u0440\u0443\u0437\u043a\u0438: <\/p>\n<ul>\n<li>\n<p>\u0414\u043b\u044f \u0432\u0441\u0435\u0445 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432 \u043e\u0441\u043d\u043e\u0432\u043d\u043e\u0439 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0438 \u0438\u043d\u0434\u0435\u043a\u0441\u0430<\/p>\n<\/li>\n<\/ul>\n<pre><code>ParallelFlux dataEventsFlux = reactorRepositoryMongoDB         .findAll(mongoElasticIndex.getCollection(), mongoElasticIndex.getProjection())         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic());<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434 findAll \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u043f\u043e\u0442\u043e\u043a \u0434\u043b\u044f \u0432\u0441\u0435\u0445 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0438. Parallel \u0438 runOn \u043d\u0430\u0441\u0442\u0440\u0430\u0438\u0432\u0430\u044e\u0442 \u043c\u043d\u043e\u0433\u043e\u043f\u043e\u0442\u043e\u0447\u043d\u043e\u0441\u0442\u044c \u0434\u043b\u044f \u0432\u044b\u0431\u043e\u0440\u043a\u0438 \u0438 \u0434\u0430\u043b\u044c\u043d\u0435\u0439\u0448\u0435\u0439 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438.   <\/p>\n<ul>\n<li>\n<p>\u0414\u043b\u044f \u0435\u0434\u0438\u043d\u0438\u0447\u043d\u043e\u0433\u043e \u0437\u0430\u043f\u0440\u043e\u0441\u0430, \u043f\u0440\u0438\u0445\u043e\u0434\u044f\u0449\u0435\u043c\u0443 \u0438\u0437 \u043e\u0447\u0435\u0440\u0435\u0434\u0438   <\/p>\n<\/li>\n<\/ul>\n<pre><code>ParallelFlux dataEventsFlux = reactiveQueue.inboundFlux()         .parallel(appConfig.getIndexParallelism())         .runOn(Schedulers.boundedElastic())         .map(msg -> {             IndexEvent indexEvent = reactiveQueue.msg2IndexEvent(msg);             try {                 return CreateIndexItem(indexEvent);             } catch (IllegalObjectIdException | IOException | ConvertDataException e) {                 logger.error(\"{} For message: {}\", String.join(\", \",throwable2ListMessage(e)),                         new String(msg.getBody(), StandardCharsets.UTF_8));                 return new IndexItem(null, null, null);             }         })         .filter(e -> nonNull(e.getAction()))         .flatMap(item ->             Flux.zip(\"delete\".equals(item.getAction())                         \/\/ \u0414\u043b\u044f \u043e\u043f\u0435\u0440\u0430\u0446\u0438\u0438 \u0443\u0434\u0430\u043b\u0435\u043d\u0438\u044f \u0441\u043e\u0437\u0434\u0430\u0451\u0442\u0441\u044f Document, \u0441\u043e\u0434\u0435\u0440\u0436\u0430\u0449\u0438\u0439 _id \u0443\u0434\u0430\u043b\u044f\u0435\u043c\u043e\u0433\u043e \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430                     ? Flux.just(new Document().append(\"_id\", item.getIdDocument().get(\"_id\")))                         \/\/ \u0414\u043b\u044f \u043e\u043f\u0435\u0440\u0430\u0446\u0438\u0438 \u043e\u0431\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u0438\u043d\u0434\u0435\u043a\u0441\u0430 Document \u0437\u0430\u0433\u0440\u0443\u0436\u0430\u0435\u0442\u0441\u044f \u0438\u0437 \u0431\u0430\u0437\u044b \u0434\u0430\u043d\u043d\u044b\u0445                     : reactorRepositoryMongoDB.find(                         item.getMongoElasticIndex().getCollection(),                         item.getIdDocument(),                         item.getMongoElasticIndex().getProjection()),                 Flux.just(item)             )             .map(d -> new EventDocument(d.getT2().getAction(),                     d.getT1(),                     d.getT2().getMongoElasticIndex()))         );<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434 inboundFlux \u0438\u043d\u0442\u0435\u0440\u0444\u0435\u0439\u0441\u0430 reactiveQueue \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u043f\u043e\u0442\u043e\u043a \u0434\u043b\u044f \u043e\u0447\u0435\u0440\u0435\u0434\u0438. Parallel \u0438 runOn \u0438\u0434\u0435\u043d\u0442\u0438\u0447\u043d\u044b \u043f\u0440\u0435\u0434\u044b\u0434\u0443\u0449\u0435\u043c\u0443 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0443. \u0414\u0430\u043b\u0435\u0435 \u0441\u043e\u0431\u044b\u0442\u0438\u0435 \u043f\u0440\u0435\u043e\u0431\u0440\u0430\u0437\u0443\u0435\u0442\u0441\u044f \u0438\u0437 JSON \u0432 \u043e\u0431\u044a\u0435\u043a\u0442 IndexEvent, \u043f\u043e \u0441\u043e\u0434\u0435\u0440\u0436\u0438\u043c\u043e\u043c\u0443 \u043a\u043e\u0442\u043e\u0440\u043e\u0433\u043e \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442 \u0438\u0437\u0432\u043b\u0435\u043a\u0430\u044e\u0442\u0441\u044f \u0438\u0437 \u0431\u0430\u0437\u044b, \u0438\u043b\u0438 \u0441\u043e\u0437\u0434\u0430\u0451\u0442\u0441\u044f \u043e\u0431\u044a\u0435\u043a\u0442 \u0434\u043b\u044f \u0443\u0434\u0430\u043b\u0435\u043d\u0438\u044f \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0430 \u0438\u0437 ElasticSearch.<\/p>\n<h3>\u0414\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u043a \u0438\u0441\u0445\u043e\u0434\u043d\u043e\u043c\u0443 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0443 \u0438\u0437 \u043f\u0440\u0438\u0441\u043e\u0435\u0434\u0438\u043d\u044f\u0435\u043c\u044b\u0445 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0439   <\/h3>\n<pre><code>private &lt;T> Function&lt;ParallelFlux&lt;T>, ParallelFlux&lt;T>>     joinData(Function&lt;T, Document> getDocument,             Function&lt;T, MongoElasticIndex> getMongoElasticIndex) {     return (ParallelFlux&lt;T> items) ->             items.flatMap(p -> {             if(getDocument.apply(p).size() == 1) {                 return Flux.just(p);             }             return                 Flux.fromIterable(getMongoElasticIndex.apply((T) p).getJoinConditions(getDocument.apply(p)))                         .flatMap(it -> Flux.zip(Flux.just(it.getCollection().getJoinedFieldName()),                                 reactorRepositoryMongoDB.find(getMongoElasticIndex.apply((T) p).getCollection(),                                         it.getCondition(),                                         it.getCollection().getProjection())))                         .reduce(p, (acc, t) -> {                             getDocument.apply(acc).put(t.getT1(), t.getT2());                             return acc;                         });                 }         ); }<\/code><\/pre>\n<p>\u041c\u0435\u0442\u043e\u0434 joinData \u0432\u043e\u0437\u0432\u0440\u0430\u0449\u0430\u0435\u0442 \u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u044b\u0439 \u043e\u0431\u044a\u0435\u043a\u0442, \u0434\u043e\u0431\u0430\u0432\u043b\u044f\u044e\u0449\u0438\u0439 \u0434\u0430\u043d\u043d\u044b\u0435 \u043a \u0438\u0441\u0445\u043e\u0434\u043d\u043e\u043c\u0443 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u0443 \u0438\u0437 \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u0432 \u043f\u0440\u0438\u0441\u043e\u0435\u0434\u0438\u043d\u044f\u0435\u043c\u044b\u0445 \u043a\u043e\u043b\u043b\u0435\u043a\u0446\u0438\u0439. \u0418\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435 flatMap \u0438 Flux.zip \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u0435\u0442 \u0430\u0441\u0438\u043d\u0445\u0440\u043e\u043d\u043d\u043e \u0437\u0430\u043f\u0443\u0441\u043a\u0430\u0442\u044c \u0438 \u043e\u0431\u0440\u0430\u0431\u0430\u0442\u044b\u0432\u0430\u0442\u044c \u043f\u043e\u0442\u043e\u043a\u0438, \u0432 \u0442\u043e\u043c \u0447\u0438\u0441\u043b\u0435 \u0438 \u043f\u043e\u0442\u043e\u043a\u0438, \u0441\u043e\u0437\u0434\u0430\u0432\u0430\u0435\u043c\u044b\u0435 \u0437\u0430\u043f\u0440\u043e\u0441\u0430\u043c\u0438 \u043a \u0431\u0430\u0437\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 mongodb. \u0412\u0441\u0435 \u0432\u043e\u043f\u0440\u043e\u0441\u044b, \u0441\u0432\u044f\u0437\u0430\u043d\u043d\u044b\u0435 \u0441 \u0441\u0438\u043d\u0445\u0440\u043e\u043d\u0438\u0437\u0430\u0446\u0438\u0435\u0439, \u0431\u0435\u0440\u0435\u0442 \u043d\u0430 \u0441\u0435\u0431\u044f Reactor.<\/p>\n<h3>\u0413\u0435\u043d\u0435\u0440\u0430\u0446\u0438\u044f JSON \u0434\u043b\u044f ElasticSearch<\/h3>\n<pre><code>private &lt;T> Function&lt;ParallelFlux&lt;T>, ParallelFlux&lt;String>>     document2ElasticJson(             Function&lt;T, String> getAction,             Function&lt;T, Document> getDocument,             Function&lt;T, MongoElasticIndex> getMongoElasticIndex) {     return (ParallelFlux&lt;T> items) -> items.map(item -> {         String elasticSend;         try {             Document document = getDocument.apply(item);             MongoElasticIndex mongoElasticIndex = getMongoElasticIndex.apply(item);             elasticSend<\/code><\/pre>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[],"class_list":["post-341197","post","type-post","status-publish","format-standard","hentry"],"_links":{"self":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/341197","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=341197"}],"version-history":[{"count":0,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/341197\/revisions"}],"wp:attachment":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=341197"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=341197"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=341197"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}