Иванов Максим
Младший Java программист
Рецепт легкого перекуса для «Telegram — монстра Франкенштейна»

Всем привет, это вторая часть создания телеграм-бота (ссылка на первую часть), в ней мы реализуем Kafka Consumer, который будет ловить любые колебания в силе и выдавать нам всю информацию о действиях пользователя.
Ингредиенты:
-
Создание Spring Boot проект, проще всего это сделать через Spring Initializr. (в качестве системы сборки будет использоваться Gradle)
-
PostgreSQL (для комфортной работы я использую DBeaver)
Если возникнут сложности с воссозданием туториала
Начинаем с яичных желтков:
Первостепенно нужно настроить build.gradle со всеми зависимостями
build.gradle
plugins { id 'org.springframework.boot' version '2.5.6' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } group = 'com.demo.kafka' version = '0.0.1-SNAPSHOT' sourceCompatibility = '14' repositories { mavenCentral() } configurations.all { exclude module: 'slf4j-log4j12' } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6' implementation 'org.springframework.kafka:spring-kafka:2.7.6' implementation 'org.projectlombok:lombok:1.18.22' implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6' implementation 'org.springframework.data:spring-data-commons:2.6.0' implementation 'org.postgresql:postgresql:42.3.1' implementation 'com.h2database:h2:1.4.200' testImplementation 'org.springframework.boot:spring-boot-starter-test:2.5.6' testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.6' compileOnly 'org.projectlombok:lombok:1.18.22' annotationProcessor 'org.projectlombok:lombok:1.18.22' } test { useJUnitPlatform() }
Далее для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka consumer
application.yml
server: port: 9002 spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Ну и в конце — настройки application.properties
application.properties
# HTTP port for incoming requests server.port=8080 # Log db app.db.demo.url=jdbc:postgresql://localhost:5432/change-me app.db.demo.driver=org.postgresql.Driver app.db.demo.user=change-me app.db.demo.password=change-me app.db.demo.pool-size=10 # kafka-metadata-consumer app.metadata.tag=logs app.metadata.bootstrapServers=athena:9092 app.metadata.groupId=group_id app.metadata.topic=users app.metadata.autoOffsetReset=earliest app.metadata.enableAutoCommit=false app.metadata.maxPollRecords=10 app.metadata.concurrency=4 app.metadata.path=files # logging logging.level.root=INFO logging.level.org.springframework.web=DEBUG logging.level.ru.centerinform.webhook=TRACE logging.file.name=change-me
Хорошо, говоря о структуре проекта, то советую придерживаться такого вида:

Пакеты:
-
config — описание бинов и конфигурации проекта
-
controller — обрабатывает запрос пользователя
-
model — хранит модель данных, а так же описывает маппер для этой модели
-
repository — логика работа с БД
-
service — основная бизнес логика проекта
Намазываем на тост, посыпаем сыром и кидаем в духовку:
Так как настройка бинов, можно сказать — шаблонный код, за основу она идентична с первой частью, так что, не удивляйтесь сходству.
Настройки бинов:
— Первым делом в пакете config прописываем конфигурацию бинов нашего приложения, тут настройки инициализации JdbcTemplate, так же, обратите внимание, что внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc
DbConfig
@Configuration public class DbConfig extends DefaultDbConfig { @Bean @Qualifier("demo") @ConfigurationProperties(prefix = "app.db.demo") SpringDataJdbcProperties demoJdbcProperties() { return new SpringDataJdbcProperties(); } @Bean @Qualifier("demo") public DataSource demoDataSource(@Qualifier("demo") SpringDataJdbcProperties properties) { return hikariDataSource("db", properties); } @Bean @Qualifier("demo") JdbcTemplate demoJdbcTemplate(@Qualifier("demo") DataSource dataSource) { return new JdbcTemplate(dataSource); } @Data @NoArgsConstructor public static class SpringDataJdbcProperties { // constants private static final String H2_DATABASE_DRIVER = "org.h2.Driver"; /** * JDBC URL property */ String url; /** * JDBC driver class name property */ String driver; /** * JDBC username property */ String user; /** * JDBC password property */ String password; /** * Hikari / Vertica maxPoolSize property */ String poolSize; /** * Minimum pool size */ int minPoolSize = 4; /** * Maximum pool size */ int maxPoolSize = 10; /** * This property controls the maximum amount of time (in milliseconds) that a connection is allowed to * sit idle in the pool. A value of 0 means that idle connections are never removed from the pool. */ long idleTimeout; /** * This property controls the maximum lifetime of a connection in the pool. When a connection * reaches this timeout, even if recently used, it will be retired from the pool. * An in-use connection will never be retired, only when it is idle will it be removed */ long maxLifetime; /** * Bulk insert size */ Integer bulkSize; /** * All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging) * * @param url JDBC driver class name property * @param driver JDBC driver class name property * @param user JDBC username property * @param password JDBC password property * @param poolSize Hikari / Vertica maxPoolSize property * @param bulkSize bulk insert size */ public SpringDataJdbcProperties( String url, String driver, String user, String password, String poolSize, Integer bulkSize) { this.url = url; this.driver = driver; this.user = user; this.password = password; this.poolSize = poolSize; this.bulkSize = bulkSize; } /** * Возвращает истину, если экземпляр описывает in-memory H2 database * * @return истина, если экземпляр описывает in-memory H2 database */ public boolean isH2Database() { return driver.equals(H2_DATABASE_DRIVER); } /** * Возвращает строковое представление экземпляра объекта в формате JSON * * @return строковое представление экземпляра объекта в формате JSON */ @Override public String toString() { var props = new SpringDataJdbcProperties( url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize); return Json.encode(props); } } }
— Создадим базовый класс для уменьшения дублирования кода инициализации бинов
DefaultDbConfig
@Slf4j class DefaultDbConfig { protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) { log.info("[{}] настройки БД: [{}]", tag, properties.toString()); HikariDataSource ds = new HikariDataSource(); ds.setJdbcUrl(properties.getUrl()); ds.setDriverClassName(properties.getDriver()); ds.setUsername(properties.getUser()); ds.setPassword(properties.getPassword()); ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize())); return ds; } }
— После, напишем утилитный класс для логирования
Json
public class Json { static final ObjectMapper mapper = new ObjectMapper(); /** * Encode instance as JSON * * @param obj instance * @return JSON */ public static String encode(Object obj) { try { return mapper.writeValueAsString(obj); } catch (JsonProcessingException e) { return obj.toString(); } } public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException { return mapper.readValue(json, clazz); } }
Создание модели данных
— Создаем модель для обработки логов ConsumerLog, а так же маппер ConsumerMapper, который понадобиться для работы с БД и описывания полей в таблице
ConsumerLog
@Data @RequiredArgsConstructor public class ConsumerLog { @JsonProperty("id") @JsonIgnoreProperties(ignoreUnknown = true) private final int id; @JsonProperty("message") @JsonIgnoreProperties(ignoreUnknown = true) private final String msg; @JsonProperty("topic") @JsonIgnoreProperties(ignoreUnknown = true) private final String topic; @JsonProperty("logDate") @JsonIgnoreProperties(ignoreUnknown = true) private final LocalDate logDate; @Override public String toString() { return "Was added log [id=" + id + ", topic=" + topic + "log=" + msg + ", date=" + logDate.toString() + "]"; } }
ConsumerMapper
@Slf4j public class ConsumerMapper implements RowMapper<ConsumerLog> { @Override public ConsumerLog mapRow(ResultSet rs, int rowNum) throws SQLException { var date = rs.getDate("date_time"); var entity = new ConsumerLog( rs.getInt("id"), rs.getString("message"), rs.getString("topic"), date == null ? null : date.toLocalDate() ); log.trace("ConsumerMapper(): entity = [{}]", entity); return entity; } }
После создания модели данных и ее маппера, приступаем к репозиториям
— Создадим интерфейс, который описывает методы, для работы с записями в БД
IConsumerLogRepository
public interface IConsumerLogRepository { /** * Возвращает список записей * * @return список всех записей * @throws DbException в случае ошибки БД */ List<ConsumerLog> getLogsList(); /** * Вставка новой записи * * @param entity новая запись * @throws DbException в случае ошибки БД */ void insert(ConsumerLog entity); }
— Теперь напишем класс, который реализует методы интерфейса
ConsumerLogRepository
@Repository @Slf4j public class ConsumerLogRepository implements IConsumerLogRepository { private static final String SQL_SELECT_LIST = "SELECT id, message, date_time, topic FROM log"; private static final String SQL_INSERT = "INSERT INTO log (message, date_time, topic) VALUES (?, ?, ?)"; protected final static ConsumerMapper CONSUMER_LOG_MAPPER = new ConsumerMapper(); protected final JdbcTemplate template; public ConsumerLogRepository(@Qualifier("demo") JdbcTemplate template) { this.template = template; } /** * Возвращает записи элемента из таблицы логов подписчика */ @Override public List<ConsumerLog> getLogsList() throws DbException { return template.query(SQL_SELECT_LIST, CONSUMER_LOG_MAPPER); } /** * Заполняет записи элементами из приходящего топика логов */ @Override public void insert(ConsumerLog entity) throws DbException { var result = template.update(SQL_INSERT, entity.getMsg(), entity.getLogDate(), entity.getTopic()); if (result != 1) log.trace("ConsumerLogRepository.insert() with {} rows inserted", entity); log.trace("insert({}) result={}", entity, result); } }
Ну и главный элемент бизнес логики приложения — kafka consumer
— Это класс подписчик, он получает сообщения из Kafka и обрабатывает их
Consumer
@Slf4j @Service @AllArgsConstructor public class Consumer { private static final String TOPIC_NAME = "users"; protected final IConsumerLogRepository consumerRepo; /** * Метод обработки сообщений от producer, * который "отлавливает" эти самые сообщения с помощью аннотации KafkaListener и принимает их в виде параметра. * * @param message сообщение от producer, которое прилетает в кафка */ @KafkaListener(topics = TOPIC_NAME, groupId = "group_id") public void consumeWriting(String message) { var consumerLog = new ConsumerLog(0, message, TOPIC_NAME, LocalDate.now()); consumerRepo.insert(consumerLog); log.info("#### Consumed received message [{}]", message); } /** * Получение списка логов из БД */ public List<ConsumerLog> consumeLog() { var list = consumerRepo.getLogsList(); list.forEach(msg -> log.info("#### Consumer list log [{}]", msg.toString())); return list; } }
Далее, как и в прошлой статье, мы напишем контроллер, для доступа к сервису из вне
— Создаем простенький контроллер, для получения списка логов из БД
TestController
@Slf4j @RestController @AllArgsConstructor @RequestMapping(value = "/kafka") public class TestController { private final Consumer consumerService; /** * Возвращает записи элемента из таблицы логов подписчика * */ @GetMapping(value = "/log_list") public String getLogList() { log.trace("[GET] getLogList()"); return consumerService.consumeLog().toString(); } }
В заключении, класс, который собственно и запускает все наше приложение
BotLogsApplication
@SpringBootApplication public class BotLogsApplication { public static void main(String[] args) { SpringApplication.run(BotLogsApplication.class, args); } }
Перед тем, как вытаскивать из духовки наше блюдо, нужно подготовиться:
— Сначала проверим, запущена ли Kafka

— После, запускаем Conductor и видим, что у нас работает топик users

— Далее запускаем DBeaver и благодаря первой статье, у нас уже заранее создано 2 таблицы (log и user_table), схема создания таблиц из первой части
Отлично, вынимаем наши тосты из духовки:
— Запускаем проект, проверяем, что все настроено и корректно работает
Логи запущенного приложения
. ____ _ __ _ _ /\ / ' __ _ () __ __ _ \ \ \ ( ( )__ | '_ | '| | ' / ` | \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ' || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.5.6) 2022-01-19 22:14:49.283 INFO 41808 --- [ main] c.l.kafka.consumer.BotLogsApplication : No active profile set, falling back to default profiles: default 2022-01-19 22:14:49.903 INFO 41808 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9002 (http) 2022-01-19 22:14:49.910 INFO 41808 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2022-01-19 22:14:49.910 INFO 41808 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54] 2022-01-19 22:14:49.974 INFO 41808 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2022-01-19 22:14:49.974 INFO 41808 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 667 ms 2022-01-19 22:14:50.048 INFO 41808 --- [ main] c.l.k.consumer.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/change-me","driver":"org.postgresql.Driver","user":"change-me","password":"*","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}] 2022-01-19 22:14:50.242 DEBUG 41808 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice2022-01-19 22:14:50.175 DEBUG 41808 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice 2022-01-19 22:14:50.214 DEBUG 41808 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping' 2022-01-19 22:14:50.236 DEBUG 41808 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/, /] in 'resourceHandlerMapping' 2022-01-19 22:14:50.242 DEBUG 41808 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice 2022-01-19 22:14:50.367 INFO 41808 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-group_id-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = group_id group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2022-01-19 22:14:50.406 INFO 41808 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1 2022-01-19 22:14:50.407 INFO 41808 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457 2022-01-19 22:14:50.407 INFO 41808 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642619690406 2022-01-19 22:14:50.408 INFO 41808 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): users 2022-01-19 22:14:50.422 INFO 41808 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9002 (http) with context path '' 2022-01-19 22:14:50.429 INFO 41808 --- [ main] c.l.kafka.consumer.BotLogsApplication : Started BotLogsApplication in 1.413 seconds (JVM running for 1.876) 2022-01-19 22:14:50.551 INFO 41808 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: O9iXkXIMQpKE3DgrEQtJ5w 2022-01-19 22:14:50.552 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator omen:9092 (id: 2147483647 rack: null) 2022-01-19 22:14:50.553 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-01-19 22:14:50.560 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-01-19 22:14:50.562 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=17, memberId='consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b', protocol='range'} 2022-01-19 22:14:50.563 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 17: {consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b=Assignment(partitions=[users-0])} 2022-01-19 22:14:50.632 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=17, memberId='consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b', protocol='range'} 2022-01-19 22:14:50.633 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[users-0]) 2022-01-19 22:14:50.637 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: users-0 2022-01-19 22:14:50.651 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omen:9092 (id: 0 rack: null)], epoch=0}} 2022-01-19 22:14:50.652 INFO 41808 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group_id: partitions assigned: [users-0]
— Открываем телеграм и пробуем на вкус наши закуски
-
Пишем — /start, начинаем тест и видим, что бот работает !

— Давайте посмотрим, что же нам написал Spring в логах, отловил ли наш consumer данные из Kafka и сделал ли записи в БД ?
Логи нашего consumer, ошибок не наблюдается
. ____ _ __ _ _ /\ / ' __ _ () __ __ _ \ \ \ ( ( )__ | '_ | '| | ' / ` | \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ' || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.5.6) 2022-01-19 22:21:26.142 INFO 42281 --- [ main] c.l.kafka.consumer.BotLogsApplication : No active profile set, falling back to default profiles: default 2022-01-19 22:21:27.195 INFO 42281 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9002 (http) 2022-01-19 22:21:27.201 INFO 42281 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2022-01-19 22:21:27.201 INFO 42281 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54] 2022-01-19 22:21:27.245 INFO 42281 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2022-01-19 22:21:27.246 INFO 42281 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1030 ms 2022-01-19 22:21:27.329 INFO 42281 --- [ main] c.l.k.consumer.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"postgres","password":"*","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}] 2022-01-19 22:21:27.561 DEBUG 42281 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice2022-01-19 22:21:27.490 DEBUG 42281 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice 2022-01-19 22:21:27.524 DEBUG 42281 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping' 2022-01-19 22:21:27.551 DEBUG 42281 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/, /] in 'resourceHandlerMapping' 2022-01-19 22:21:27.561 DEBUG 42281 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice 2022-01-19 22:21:27.726 INFO 42281 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-group_id-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = group_id group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2022-01-19 22:21:27.772 INFO 42281 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1 2022-01-19 22:21:27.772 INFO 42281 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457 2022-01-19 22:21:27.772 INFO 42281 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642620087771 2022-01-19 22:21:27.774 INFO 42281 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): users 2022-01-19 22:21:27.787 INFO 42281 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9002 (http) with context path '' 2022-01-19 22:21:27.794 INFO 42281 --- [ main] c.l.kafka.consumer.BotLogsApplication : Started BotLogsApplication in 2.184 seconds (JVM running for 2.825) 2022-01-19 22:21:27.964 INFO 42281 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: O9iXkXIMQpKE3DgrEQtJ5w 2022-01-19 22:21:27.965 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator omen:9092 (id: 2147483647 rack: null) 2022-01-19 22:21:27.974 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-01-19 22:21:27.988 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group 2022-01-19 22:21:27.993 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=19, memberId='consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066', protocol='range'} 2022-01-19 22:21:27.994 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 19: {consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066=Assignment(partitions=[users-0])} 2022-01-19 22:21:28.000 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=19, memberId='consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066', protocol='range'} 2022-01-19 22:21:28.002 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[users-0]) 2022-01-19 22:21:28.003 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: users-0 2022-01-19 22:21:28.011 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omen:9092 (id: 0 rack: null)], epoch=0}} 2022-01-19 22:21:28.012 INFO 42281 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group_id: partitions assigned: [users-0] 2022-01-19 22:21:29.407 INFO 42281 --- [ntainer#0-0-C-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting... 2022-01-19 22:21:29.566 INFO 42281 --- [ntainer#0-0-C-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed. 2022-01-19 22:22:17.027 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> команда: /start] 2022-01-19 22:22:20.652 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> мысль: Как же хочется написать статью на Хабр !!!] 2022-01-19 22:22:25.344 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> мысль: Может написать статью о боте в Телеграмм ?] 2022-01-19 22:22:30.394 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> мысль: Написать статью!] 2022-01-19 22:22:35.652 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> команда: /idea]
— Сообщения, отправленные producer в Kafka, были обработаны нашим consumer и записаны в БД

— Далее, по инструкции из первой статьи, откройте окно Сonsume from Topic, здесь показаны прилетевшие в Kafka сообщения
Как и в первой статье, мы убедились, что приложение корректно работает, сообщения благополучно прилетели в Kafka, были отловлены, обработаны и записаны в БД

Вот и все, надеюсь, что у всех получилось повторить туториал в первого раза, в будущем будет еще много интересного, всем спасибо.
ссылка на оригинал статьи https://habr.com/ru/post/656573/
Добавить комментарий