Создание телеграм-бота (Spring Boot, Kafka, PostgreSQL), часть вторая

от автора

Иванов Максим

Младший Java программист

Рецепт легкого перекуса для «Telegram — монстра Франкенштейна»

Ничто не вызывает у нас столь мучительных страданий, как резкая и внезапная перемена.- «Франкенштейн» Мэри Шелли
Ничто не вызывает у нас столь мучительных страданий, как резкая и внезапная перемена.- «Франкенштейн» Мэри Шелли

Всем привет, это вторая часть создания телеграм-бота (ссылка на первую часть), в ней мы реализуем Kafka Consumer, который будет ловить любые колебания в силе и выдавать нам всю информацию о действиях пользователя.

Ингредиенты:

  1. Регистрация бота

  2. Создание Spring Boot проект, проще всего это сделать через Spring Initializr. (в качестве системы сборки будет использоваться Gradle)

  3. Kafka (для отслеживания топиков я использую Conductor)

  4. PostgreSQL (для комфортной работы я использую DBeaver)

Если возникнут сложности с воссозданием туториала

Прошу пишите в комментариях возникшие проблемы, на всякий случай — вот мой git и ТГ

Начинаем с яичных желтков:

Первостепенно нужно настроить build.gradle со всеми зависимостями

build.gradle
plugins { id 'org.springframework.boot' version '2.5.6' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' }  group = 'com.demo.kafka' version = '0.0.1-SNAPSHOT' sourceCompatibility = '14'  repositories { mavenCentral() }  configurations.all { exclude module: 'slf4j-log4j12' }  dependencies { implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6' implementation 'org.springframework.kafka:spring-kafka:2.7.6'   implementation 'org.projectlombok:lombok:1.18.22' implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6' implementation 'org.springframework.data:spring-data-commons:2.6.0' implementation 'org.postgresql:postgresql:42.3.1' implementation 'com.h2database:h2:1.4.200'  testImplementation 'org.springframework.boot:spring-boot-starter-test:2.5.6' testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.6'  compileOnly 'org.projectlombok:lombok:1.18.22' annotationProcessor 'org.projectlombok:lombok:1.18.22' }  test { useJUnitPlatform() }

Далее для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka consumer

application.yml
server:   port: 9002 spring:   kafka:     consumer:       bootstrap-servers: localhost:9092       group-id: group_id       auto-offset-reset: earliest       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Ну и в конце — настройки application.properties

application.properties
# HTTP port for incoming requests server.port=8080  # Log db app.db.demo.url=jdbc:postgresql://localhost:5432/change-me app.db.demo.driver=org.postgresql.Driver app.db.demo.user=change-me app.db.demo.password=change-me app.db.demo.pool-size=10  # kafka-metadata-consumer app.metadata.tag=logs app.metadata.bootstrapServers=athena:9092 app.metadata.groupId=group_id app.metadata.topic=users app.metadata.autoOffsetReset=earliest app.metadata.enableAutoCommit=false app.metadata.maxPollRecords=10 app.metadata.concurrency=4 app.metadata.path=files   # logging logging.level.root=INFO logging.level.org.springframework.web=DEBUG logging.level.ru.centerinform.webhook=TRACE logging.file.name=change-me

Хорошо, говоря о структуре проекта, то советую придерживаться такого вида:

Структура проекта
Структура проекта

Пакеты:

  • config — описание бинов и конфигурации проекта

  • controller — обрабатывает запрос пользователя

  • model — хранит модель данных, а так же описывает маппер для этой модели

  • repository — логика работа с БД

  • service — основная бизнес логика проекта

Намазываем на тост, посыпаем сыром и кидаем в духовку:

Так как настройка бинов, можно сказать — шаблонный код, за основу она идентична с первой частью, так что, не удивляйтесь сходству.

Настройки бинов:

— Первым делом в пакете config прописываем конфигурацию бинов нашего приложения, тут настройки инициализации JdbcTemplate, так же, обратите внимание, что внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc

DbConfig
@Configuration public class DbConfig extends DefaultDbConfig {      @Bean     @Qualifier("demo")     @ConfigurationProperties(prefix = "app.db.demo")     SpringDataJdbcProperties demoJdbcProperties() {         return new SpringDataJdbcProperties();     }      @Bean     @Qualifier("demo")     public DataSource demoDataSource(@Qualifier("demo") SpringDataJdbcProperties properties) {         return hikariDataSource("db", properties);     }      @Bean     @Qualifier("demo")     JdbcTemplate demoJdbcTemplate(@Qualifier("demo") DataSource dataSource) {         return new JdbcTemplate(dataSource);     }      @Data     @NoArgsConstructor     public static class SpringDataJdbcProperties {          // constants         private static final String H2_DATABASE_DRIVER = "org.h2.Driver";          /**          * JDBC URL property          */         String url;         /**          * JDBC driver class name property          */         String driver;         /**          * JDBC username property          */         String user;         /**          * JDBC password property          */         String password;         /**          * Hikari / Vertica maxPoolSize property          */         String poolSize;         /**          * Minimum pool size          */         int minPoolSize = 4;         /**          * Maximum pool size          */         int maxPoolSize = 10;         /**          * This property controls the maximum amount of time (in milliseconds) that a connection is allowed to          * sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.          */         long idleTimeout;         /**          * This property controls the maximum lifetime of a connection in the pool. When a connection          * reaches this timeout, even if recently used, it will be retired from the pool.          * An in-use connection will never be retired, only when it is idle will it be removed          */         long maxLifetime;         /**          * Bulk insert size          */         Integer bulkSize;           /**          * All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)          *          * @param url JDBC driver class name property          * @param driver JDBC driver class name property          * @param user JDBC username property          * @param password JDBC password property          * @param poolSize Hikari / Vertica maxPoolSize property          * @param bulkSize bulk insert size          */         public SpringDataJdbcProperties(                 String url, String driver, String user, String password, String poolSize, Integer bulkSize) {             this.url = url;             this.driver = driver;             this.user = user;             this.password = password;             this.poolSize = poolSize;             this.bulkSize = bulkSize;         }           /**          * Возвращает истину, если экземпляр описывает in-memory H2 database          *          * @return истина, если экземпляр описывает in-memory H2 database          */         public boolean isH2Database() {             return driver.equals(H2_DATABASE_DRIVER);         }          /**          * Возвращает строковое представление экземпляра объекта в формате JSON          *          * @return строковое представление экземпляра объекта в формате JSON          */         @Override         public String toString() {             var props = new SpringDataJdbcProperties(                     url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);             return Json.encode(props);         }      }  }

— Создадим базовый класс для уменьшения дублирования кода инициализации бинов

DefaultDbConfig
@Slf4j class DefaultDbConfig {      protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {         log.info("[{}] настройки БД: [{}]", tag, properties.toString());          HikariDataSource ds = new HikariDataSource();         ds.setJdbcUrl(properties.getUrl());         ds.setDriverClassName(properties.getDriver());         ds.setUsername(properties.getUser());         ds.setPassword(properties.getPassword());         ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));         return ds;     } }

— После, напишем утилитный класс для логирования

Json
public class Json {     static final ObjectMapper mapper = new ObjectMapper();      /**      * Encode instance as JSON      *      * @param obj instance      * @return JSON      */     public static String encode(Object obj) {         try {             return mapper.writeValueAsString(obj);         } catch (JsonProcessingException e) {             return obj.toString();         }     }      public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {         return mapper.readValue(json, clazz);     }  }

Создание модели данных

— Создаем модель для обработки логов ConsumerLog, а так же маппер ConsumerMapper, который понадобиться для работы с БД и описывания полей в таблице

ConsumerLog
@Data @RequiredArgsConstructor public class ConsumerLog {      @JsonProperty("id")     @JsonIgnoreProperties(ignoreUnknown = true)     private final int id;      @JsonProperty("message")     @JsonIgnoreProperties(ignoreUnknown = true)     private final String msg;      @JsonProperty("topic")     @JsonIgnoreProperties(ignoreUnknown = true)     private final String topic;      @JsonProperty("logDate")     @JsonIgnoreProperties(ignoreUnknown = true)     private final LocalDate logDate;      @Override     public String toString() {         return "Was added log [id=" + id + ", topic=" + topic + "log=" + msg + ", date=" +  logDate.toString() + "]";     } }
ConsumerMapper
@Slf4j public class ConsumerMapper implements RowMapper<ConsumerLog> {      @Override     public ConsumerLog mapRow(ResultSet rs, int rowNum) throws SQLException {         var date = rs.getDate("date_time");         var entity = new ConsumerLog(                 rs.getInt("id"),                 rs.getString("message"),                 rs.getString("topic"),                 date == null ? null : date.toLocalDate()         );         log.trace("ConsumerMapper(): entity = [{}]", entity);         return entity;     } }

После создания модели данных и ее маппера, приступаем к репозиториям

— Создадим интерфейс, который описывает методы, для работы с записями в БД

IConsumerLogRepository
public interface IConsumerLogRepository {       /**       * Возвращает список записей       *       * @return список всех записей       * @throws DbException в случае ошибки БД       */      List<ConsumerLog> getLogsList();       /**       * Вставка новой записи       *       * @param entity новая запись       * @throws DbException в случае ошибки БД       */      void insert(ConsumerLog entity); }

— Теперь напишем класс, который реализует методы интерфейса

ConsumerLogRepository
@Repository @Slf4j public class ConsumerLogRepository implements IConsumerLogRepository {      private static final String SQL_SELECT_LIST = "SELECT id, message, date_time, topic FROM log";     private static final String SQL_INSERT = "INSERT INTO log (message, date_time, topic) VALUES (?, ?, ?)";      protected final static ConsumerMapper CONSUMER_LOG_MAPPER = new ConsumerMapper();      protected final JdbcTemplate template;      public ConsumerLogRepository(@Qualifier("demo") JdbcTemplate template) {         this.template = template;     }      /**      * Возвращает записи элемента из таблицы логов подписчика      */     @Override     public List<ConsumerLog> getLogsList() throws DbException {         return template.query(SQL_SELECT_LIST, CONSUMER_LOG_MAPPER);     }      /**      * Заполняет записи элементами из приходящего топика логов      */     @Override     public void insert(ConsumerLog entity) throws DbException {         var result = template.update(SQL_INSERT, entity.getMsg(), entity.getLogDate(), entity.getTopic());         if (result != 1) log.trace("ConsumerLogRepository.insert() with {} rows inserted", entity);         log.trace("insert({}) result={}", entity, result);     } }

Ну и главный элемент бизнес логики приложения — kafka consumer

— Это класс подписчик, он получает сообщения из Kafka и обрабатывает их

Consumer
@Slf4j @Service @AllArgsConstructor public class Consumer {     private static final String TOPIC_NAME = "users";     protected final IConsumerLogRepository consumerRepo;      /**      * Метод обработки сообщений от producer,      * который "отлавливает" эти самые сообщения с помощью аннотации KafkaListener и принимает их в виде параметра.      *      * @param message сообщение от producer, которое прилетает в кафка      */     @KafkaListener(topics = TOPIC_NAME, groupId = "group_id")     public void consumeWriting(String message) {         var consumerLog = new ConsumerLog(0, message, TOPIC_NAME, LocalDate.now());         consumerRepo.insert(consumerLog);         log.info("#### Consumed received message [{}]", message);     }      /**      * Получение списка логов из БД      */     public List<ConsumerLog> consumeLog() {         var list = consumerRepo.getLogsList();         list.forEach(msg -> log.info("#### Consumer list log [{}]", msg.toString()));         return list;     } }

Далее, как и в прошлой статье, мы напишем контроллер, для доступа к сервису из вне

— Создаем простенький контроллер, для получения списка логов из БД

TestController
@Slf4j @RestController @AllArgsConstructor @RequestMapping(value = "/kafka") public class TestController {      private final Consumer consumerService;      /**      * Возвращает записи элемента из таблицы логов подписчика      *      */     @GetMapping(value = "/log_list")     public String getLogList() {         log.trace("[GET] getLogList()");         return consumerService.consumeLog().toString();     } }

В заключении, класс, который собственно и запускает все наше приложение

BotLogsApplication
@SpringBootApplication public class BotLogsApplication { public static void main(String[] args) { SpringApplication.run(BotLogsApplication.class, args); } }

Перед тем, как вытаскивать из духовки наше блюдо, нужно подготовиться:

— Сначала проверим, запущена ли Kafka

запуск по команде - sudo su systemctl start kafka
запуск по команде — sudo su systemctl start kafka

— После, запускаем Conductor и видим, что у нас работает топик users

Вкладка topics
Вкладка topics

— Далее запускаем DBeaver и благодаря первой статье, у нас уже заранее создано 2 таблицы (log и user_table), схема создания таблиц из первой части

Отлично, вынимаем наши тосты из духовки:

— Запускаем проект, проверяем, что все настроено и корректно работает

Логи запущенного приложения
  .   ____          _            __ _ _ /\ / ' __ _ () __  __ _ \ \ \  ( ( )__ | '_ | '| | ' / ` | \ \ \  \/  )| |)| | | | | || (| |  ) ) ) ) '  || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot ::                (v2.5.6) 2022-01-19 22:14:49.283  INFO 41808 --- [           main] c.l.kafka.consumer.BotLogsApplication    : No active profile set, falling back to default profiles: default 2022-01-19 22:14:49.903  INFO 41808 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9002 (http) 2022-01-19 22:14:49.910  INFO 41808 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat] 2022-01-19 22:14:49.910  INFO 41808 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.54] 2022-01-19 22:14:49.974  INFO 41808 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext 2022-01-19 22:14:49.974  INFO 41808 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 667 ms 2022-01-19 22:14:50.048  INFO 41808 --- [           main] c.l.k.consumer.config.DefaultDbConfig    : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/change-me","driver":"org.postgresql.Driver","user":"change-me","password":"*","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}] 2022-01-19 22:14:50.242 DEBUG 41808 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice2022-01-19 22:14:50.175 DEBUG 41808 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice 2022-01-19 22:14:50.214 DEBUG 41808 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping' 2022-01-19 22:14:50.236 DEBUG 41808 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Patterns [/webjars/, /] in 'resourceHandlerMapping' 2022-01-19 22:14:50.242 DEBUG 41808 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice 2022-01-19 22:14:50.367  INFO 41808 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-group_id-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = group_id group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2022-01-19 22:14:50.406  INFO 41808 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1 2022-01-19 22:14:50.407  INFO 41808 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457 2022-01-19 22:14:50.407  INFO 41808 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1642619690406 2022-01-19 22:14:50.408  INFO 41808 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): users 2022-01-19 22:14:50.422  INFO 41808 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 9002 (http) with context path '' 2022-01-19 22:14:50.429  INFO 41808 --- [           main] c.l.kafka.consumer.BotLogsApplication    : Started BotLogsApplication in 1.413 seconds (JVM running for 1.876) 2022-01-19 22:14:50.551  INFO 41808 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: O9iXkXIMQpKE3DgrEQtJ5w 2022-01-19 22:14:50.552  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator omen:9092 (id: 2147483647 rack: null) 2022-01-19 22:14:50.553  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-01-19 22:14:50.560  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-01-19 22:14:50.562  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=17, memberId='consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b', protocol='range'} 2022-01-19 22:14:50.563  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 17: {consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b=Assignment(partitions=[users-0])} 2022-01-19 22:14:50.632  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=17, memberId='consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b', protocol='range'} 2022-01-19 22:14:50.633  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[users-0]) 2022-01-19 22:14:50.637  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: users-0 2022-01-19 22:14:50.651  INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omen:9092 (id: 0 rack: null)], epoch=0}} 2022-01-19 22:14:50.652  INFO 41808 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group_id: partitions assigned: [users-0]

— Открываем телеграм и пробуем на вкус наши закуски

  • Пишем — /start, начинаем тест и видим, что бот работает !

Общение с ботом в Телеграм
Общение с ботом в Телеграм

— Давайте посмотрим, что же нам написал Spring в логах, отловил ли наш consumer данные из Kafka и сделал ли записи в БД ?

Логи нашего consumer, ошибок не наблюдается
  .   ____          _            __ _ _ /\ / ' __ _ () __  __ _ \ \ \  ( ( )__ | '_ | '| | ' / ` | \ \ \  \/  )| |)| | | | | || (| |  ) ) ) ) '  || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot ::                (v2.5.6) 2022-01-19 22:21:26.142  INFO 42281 --- [           main] c.l.kafka.consumer.BotLogsApplication    : No active profile set, falling back to default profiles: default 2022-01-19 22:21:27.195  INFO 42281 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9002 (http) 2022-01-19 22:21:27.201  INFO 42281 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat] 2022-01-19 22:21:27.201  INFO 42281 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.54] 2022-01-19 22:21:27.245  INFO 42281 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext 2022-01-19 22:21:27.246  INFO 42281 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1030 ms 2022-01-19 22:21:27.329  INFO 42281 --- [           main] c.l.k.consumer.config.DefaultDbConfig    : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"postgres","password":"*","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}] 2022-01-19 22:21:27.561 DEBUG 42281 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice2022-01-19 22:21:27.490 DEBUG 42281 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice 2022-01-19 22:21:27.524 DEBUG 42281 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping' 2022-01-19 22:21:27.551 DEBUG 42281 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Patterns [/webjars/, /] in 'resourceHandlerMapping' 2022-01-19 22:21:27.561 DEBUG 42281 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice 2022-01-19 22:21:27.726  INFO 42281 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-group_id-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = group_id group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2022-01-19 22:21:27.772  INFO 42281 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1 2022-01-19 22:21:27.772  INFO 42281 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457 2022-01-19 22:21:27.772  INFO 42281 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1642620087771 2022-01-19 22:21:27.774  INFO 42281 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): users 2022-01-19 22:21:27.787  INFO 42281 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 9002 (http) with context path '' 2022-01-19 22:21:27.794  INFO 42281 --- [           main] c.l.kafka.consumer.BotLogsApplication    : Started BotLogsApplication in 2.184 seconds (JVM running for 2.825) 2022-01-19 22:21:27.964  INFO 42281 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: O9iXkXIMQpKE3DgrEQtJ5w 2022-01-19 22:21:27.965  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator omen:9092 (id: 2147483647 rack: null) 2022-01-19 22:21:27.974  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-01-19 22:21:27.988  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-01-19 22:21:27.993  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=19, memberId='consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066', protocol='range'} 2022-01-19 22:21:27.994  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 19: {consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066=Assignment(partitions=[users-0])} 2022-01-19 22:21:28.000  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=19, memberId='consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066', protocol='range'} 2022-01-19 22:21:28.002  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[users-0]) 2022-01-19 22:21:28.003  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: users-0 2022-01-19 22:21:28.011  INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omen:9092 (id: 0 rack: null)], epoch=0}} 2022-01-19 22:21:28.012  INFO 42281 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group_id: partitions assigned: [users-0] 2022-01-19 22:21:29.407  INFO 42281 --- [ntainer#0-0-C-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting... 2022-01-19 22:21:29.566  INFO 42281 --- [ntainer#0-0-C-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed. 2022-01-19 22:22:17.027  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> команда: /start] 2022-01-19 22:22:20.652  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> мысль: Как же хочется написать статью на Хабр !!!] 2022-01-19 22:22:25.344  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> мысль: Может написать статью о боте в Телеграмм ?] 2022-01-19 22:22:30.394  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> мысль: Написать статью!] 2022-01-19 22:22:35.652  INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer   : #### Consumed received message [Writing in log -> команда: /idea]

— Сообщения, отправленные producer в Kafka, были обработаны нашим consumer и записаны в БД

Записи в БД
Записи в БД

— Далее, по инструкции из первой статьи, откройте окно Сonsume from Topic, здесь показаны прилетевшие в Kafka сообщения

Как и в первой статье, мы убедились, что приложение корректно работает, сообщения благополучно прилетели в Kafka, были отловлены, обработаны и записаны в БД

Прилетевшие в Kafka сообщения
Прилетевшие в Kafka сообщения

Вот и все, надеюсь, что у всех получилось повторить туториал в первого раза, в будущем будет еще много интересного, всем спасибо.


ссылка на оригинал статьи https://habr.com/ru/post/656573/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *