Создание телеграмм-бота (Spring Boot, Kafka, PostgreSQL), часть первая

от автора

Иванов Максим

Младший Java программист

Рецепт по приготовлению своего «Telegram-Франкенштейна»

Даже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. - «Франкенштейн» Мэри Шелли
Даже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. — «Франкенштейн» Мэри Шелли

Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.

Статьи будут разделены на 2 части, первая часть — создание основного бота с оправкой логов (Kafka Producer) и записью их в БД, вторая часть — обработка всех логов (Kafka Consumer).

Ингредиенты:

  1. Регистрация бота

  2. Создание Spring Boot проект, проще всего это сделать через встроенный конфигуратор в IntelliJ IDEA, либо используя Spring Initializr. (в качестве системы сборки будет использоваться Gradle)

  3. Kafka (для отслеживания топиков я использую Conductor)

  4. PostgreSQL (для комфортной работы я использую DBeaver)

Если возникнут сложности с воссозданием туториала

Прошу пишите в коментариях возникшие проблемы, на всякий случай — вот мой git

Начинаем с нарезки:

Первостепенно нужно настроить build.grable со всеми зависимостями

build.grable
buildscript {     repositories {         mavenCentral()     } }  plugins {     id 'org.springframework.boot' version '2.4.2'     id 'io.spring.dependency-management' version '1.0.11.RELEASE'     id 'java' }  apply from: 'build-test.gradle'  group 'com.sercetary.bot' sourceCompatibility = '14'  configurations {     compileOnly {         extendsFrom annotationProcessor     } }  repositories {     mavenCentral() }  configurations.all {     exclude module: 'slf4j-log4j12' }  dependencies {     implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'     implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'     implementation 'org.springframework.data:spring-data-commons:2.6.0'     implementation 'org.springframework.kafka:spring-kafka:2.7.6'     implementation 'org.postgresql:postgresql:42.3.1'     implementation 'com.h2database:h2:1.4.200'      implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0'     implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0'      compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29'     compileOnly 'org.projectlombok:lombok:1.18.22'     annotationProcessor 'org.projectlombok:lombok:1.18.22' }

Далее сразу для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka producer

application.yml
server:   port: 9000 spring:   kafka:     producer:       bootstrap-servers: localhost:9092       key-serializer: org.apache.kafka.common.serialization.StringSerializer       value-serializer: org.apache.kafka.common.serialization.StringSerializer

Теперь настройки application.properties

application.properties
# HTTP port for incoming requests server.port=8081  app.http.bot=change-me telegram-bot.name=change-me telegram-bot.token=change-me  # Bot db app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me app.db.bot-db.driver=org.postgresql.Driver app.db.bot-db.user=change-me app.db.bot-db.password=change-me app.db.bot-db.pool-size=10  # logging logging.level.root=INFO logging.level.org.springframework.web=DEBUG logging.level.ru.centerinform.webhook=TRACE logging.file.name=change-me 

Хорошо, после настроек нашего проекта, давайте обговорим его структуру:

Структура проекта
Структура проекта

Пакеты:

  • config — описание бинов и конфигурации проекта

  • controller — обрабатывает запрос пользователя

  • dto — хранит данные, а так же описывает модель таблицы БД

  • exceptions — кастомный пакет обработчика ошибок

  • repository — логика работа с БД

  • service — основная бизнес логика проекта

Сейчас мы собираем игредиенты и маринуем:

Настройки бинов:

— Первым делом прописываем конфигурация бинов нашего приложения в пакете config, тут настройки инициализации TelegramBotsApi и ObjectMapper

AppConfig
@Configuration public class AppConfig {      @Bean     ObjectMapper customObjectMapper() {         return new ObjectMapper();     }      @Bean     TelegramBotsApi telegramBotsApi() throws TelegramApiException{         return new TelegramBotsApi(DefaultBotSession.class);     } }

— Внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc

DbConfig
@Configuration public class DbConfig extends DefaultDbConfig {      @Bean     @Qualifier("bot-db")     @ConfigurationProperties(prefix = "app.db.bot-db")     SpringDataJdbcProperties gitlabJdbcProperties() {         return new SpringDataJdbcProperties();     }      @Bean     @Qualifier("bot-db")     public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) {         return hikariDataSource("db", properties);     }      @Bean     @Qualifier("bot-db")     JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) {         return new JdbcTemplate(dataSource);     }      @Data     @NoArgsConstructor     public static class SpringDataJdbcProperties {          // constants         private static final String H2_DATABASE_DRIVER = "org.h2.Driver";          /**          * JDBC URL property          */         String url;         /**          * JDBC driver class name property          */         String driver;         /**          * JDBC username property          */         String user;         /**          * JDBC password property          */         String password;         /**          * Hikari / Vertica maxPoolSize property          */         String poolSize;         /**          * Minimum pool size          */         int minPoolSize = 4;         /**          * Maximum pool size          */         int maxPoolSize = 10;         /**          * This property controls the maximum amount of time (in milliseconds) that a connection is allowed to          * sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.          */         long idleTimeout;         /**          * This property controls the maximum lifetime of a connection in the pool. When a connection          * reaches this timeout, even if recently used, it will be retired from the pool.          * An in-use connection will never be retired, only when it is idle will it be removed          */         long maxLifetime;         /**          * Bulk insert size          */         Integer bulkSize;           /**          * All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)          *          * @param url JDBC driver class name property          * @param driver JDBC driver class name property          * @param user JDBC username property          * @param password JDBC password property          * @param poolSize Hikari / Vertica maxPoolSize property          * @param bulkSize bulk insert size          */         public SpringDataJdbcProperties(                 String url, String driver, String user, String password, String poolSize, Integer bulkSize) {             this.url = url;             this.driver = driver;             this.user = user;             this.password = password;             this.poolSize = poolSize;             this.bulkSize = bulkSize;         }           /**          * Возвращает истину, если экземпляр описывает in-memory H2 database          *          * @return истина, если экземпляр описывает in-memory H2 database          */         public boolean isH2Database() {             return driver.equals(H2_DATABASE_DRIVER);         }          /**          * Возвращает строковое представление экземпляра объекта в формате JSON          *          * @return строковое представление экземпляра объекта в формате JSON          */         @Override         public String toString() {             var props = new SpringDataJdbcProperties(                     url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);             return Json.encode(props);         }      }  }

— Создадим базовый класс для уменьшения дублирования кода инициализации бинов

DefaultDbConfig
@Slf4j class DefaultDbConfig {      protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {         log.info("[{}] настройки БД: [{}]", tag, properties.toString());          HikariDataSource ds = new HikariDataSource();         ds.setJdbcUrl(properties.getUrl());         ds.setDriverClassName(properties.getDriver());         ds.setUsername(properties.getUser());         ds.setPassword(properties.getPassword());         ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));         return ds;     } }

— После напишем утилитный класс для логирования

Json
public class Json {     static final ObjectMapper mapper = new ObjectMapper();      /**      * Encode instance as JSON      *      * @param obj instance      * @return JSON      */     public static String encode(Object obj) {         try {             return mapper.writeValueAsString(obj);         } catch (JsonProcessingException e) {             return obj.toString();         }     }      public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {         return mapper.readValue(json, clazz);     }

Далее мы напишем контроллер, для доступа к сервису из вне

— Создаем простенький контроллер, для получения списка записей из БД

UsersController
@Slf4j @RestController @RequestMapping("${app.http.bot") @RequiredArgsConstructor @SuppressWarnings("unused") public class UsersController {      private final UserService userService;      /**      * Возвращает список пользователей и связанных с ними планами      */     @RequestMapping(path = "/users_idea", method = RequestMethod.GET)     public List<User> getIdeaList() {         log.debug("Method - getIdeaList was called");         return userService.getUserList();     } }

После переходим к созданию модели

— Создаем модель пользователя User, а так же его маппер UserMapper, который понадобиться для работы с БД и маппинга полей в таблице

User
@Data @RequiredArgsConstructor public class User {     /**      * user's id      */     @JsonProperty("id")     private final int id;     /**      * user's name      */     @JsonProperty("name")     private final String name;     /**      * description      */     @JsonProperty("description")     private final String description;      private String startWord = "";      @Override     public String toString() { return startWord + description; } }
UserMapper
@Slf4j public class UserMapper implements RowMapper<User> {      @Override     public User mapRow(ResultSet rs, int rowNum) throws SQLException {         var entity = new User(                 rs.getInt("id"),                 rs.getString("user_name"),                 rs.getString("description")                 );         log.trace("mapRow(): entity = [{}]", entity);         return entity;     } }

Переходим к созданию кастомных exception

Для чего они нужны

Их мы используем для обработки ошибок, которые могут произойти в процессе работы приложения, чтобы бот не сломался и продолжил свою работу.

— BaseException — класс, который наследуется от RuntimeException, в конструкторе принимает 2 параметра — сообщение и тело ошибки

BaseException
@Slf4j public class BaseException extends RuntimeException{      public BaseException(String msg, Throwable t) {         super(msg, t);         log.error(msg, t);     }      public BaseException(String msg) {         super(msg);         log.error(msg);     }  }

— NotFoundException — класс, который вывзывается, когда ответ не найден, наследуется от BaseException

NotFoundException
@ResponseStatus(HttpStatus.NOT_FOUND) public class NotFoundException extends BaseException {      private final static String MESSAGE = "Not Found";      public NotFoundException(Throwable t) {         super(MESSAGE, t);     }      public NotFoundException() {         super(MESSAGE);     } }

— DbException — класс, который обрабатыевает ошибки связанные с БД, наследуется от RuntimeException

DbException
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) public class DbException extends RuntimeException {      private static final String MESSAGE = "Ошибка БД";      public DbException(String message) {         super(message);     }      public DbException(Throwable cause) {         super(MESSAGE, cause);     } }

Теперь для работы с БД, создаем repository

— Создадим интерфейс, который описывает методы, для работы с записями в БД

IUserRepository
public interface IUserRepository {      /**      * Возвращает список записей по id      *      * @return запрашиваемая запись      * @throws DbException в случае ошибки БД      */     User getById(int id);      /**      * Возвращает список записей      *      * @return список всех записей      * @throws DbException в случае ошибки БД      */     List<User> getUserList();      /**      * Вставка новой записи      *      * @param entity новая запись      * @throws DbException в случае ошибки БД      */     void insert(User entity);      /**      * Удаление записи      *      * @param entity удаляемая запись      * @throws DbException в случае ошибки БД      */     void delete(User entity); }

— Теперь напишем класс, который реализует методы интерфейса

UserRepository
@Slf4j @Repository public class UserRepository implements IUserRepository {      // constants     private static final String SQL_SELECT_BY_NAME = "" +             "SELECT id, user_name, description FROM user_table WHERE id=?";     private static final String SQL_SELECT_LIST = "" +             "SELECT id, user_name, description FROM user_table";     private static final String SQL_INSERT = "" +             "INSERT INTO user_table (user_name, description) VALUES (?, ?)";     private static final String SQL_DELETE = "" +             "DELETE FROM user_table WHERE id = ?";      protected final static UserMapper USER_MAPPER = new UserMapper();      // beans     protected final JdbcTemplate template;       /**      * Req-args constructor for Spring DI      */     public UserRepository(@Qualifier("bot-db") JdbcTemplate template) {         this.template = template;     }      /**      * Возвращает список записей по id      *      * @return запрашиваемая запись      * @throws DbException в случае ошибки БД      */     @Override     public User getById(int id) throws DbException {         try {             return DataAccessUtils.singleResult(                     template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id));         } catch (DataAccessException exception) {             throw new DbException(exception);         }     }      /**      * Возвращает список записей      *      * @return запрашиваемая запись      * @throws DbException в случае ошибки БД      */     @Override     public List<User> getUserList() throws DbException {         try {             return template.query(SQL_SELECT_LIST, USER_MAPPER);         } catch (DataAccessException exception) {             throw new DbException(exception);         }     }      /**      * Вставка новой записи      *      * @param entity новая запись      * @throws DbException в случае ошибки БД      */     @Override     public void insert(User entity) throws DbException {         try {             // В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом             var result = template.update(SQL_INSERT,                     entity.getName(),                     entity.getDescription());             if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity);             log.info("insert({}) result={}", entity, result);         } catch (DataAccessException exception) {             throw new DbException(exception);         }     }      /**      * Удаление записи      *      * @param entity удаляемая запись      * @throws DbException в случае ошибки БД      */     @Override     public void delete(User entity) throws DbException {         try {             var result = template.update(SQL_DELETE, entity.getId());             if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity);             log.info("delete({}) result={}", entity, result);         } catch (DataAccessException exception) {             throw new DbException(exception);         }     } }

— Далее у нас идет логика бота, тут все тривиально, в отнаследованном onUpdateReceived методе от класса родителя TelegramLongPollingBot мы пишем поведение, которое происходит при обновлении чата с пользователем, подробнее об этом здесь, так же в методе обработки сообщений есть вызов нашего producer и запись данных в БД

TelegramBot
@Slf4j @Getter @Component public class TelegramBot extends TelegramLongPollingBot {      private Message requestMessage = new Message();     private final SendMessage response = new SendMessage();     private final Producer producerService;     private final UserService userService;      private final String botUsername;     private final String botToken;      public TelegramBot(             TelegramBotsApi telegramBotsApi,             @Value("${telegram-bot.name}") String botUsername,             @Value("${telegram-bot.token}") String botToken,             Producer producerService, UserService userService) throws TelegramApiException {         this.botUsername = botUsername;         this.botToken = botToken;         this.producerService = producerService;         this.userService = userService;          telegramBotsApi.registerBot(this);     }      /**      * Этот метод вызывается при получении обновлений через метод GetUpdates.      *      * @param request Получено обновление      */     @SneakyThrows     @Override     public void onUpdateReceived(Update request) {         requestMessage = request.getMessage();         response.setChatId(requestMessage.getChatId().toString());          var entity = new User(                 0, requestMessage.getChat().getUserName(),                 requestMessage.getText());          if (request.hasMessage() && requestMessage.hasText())             log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText());          if (requestMessage.getText().equals("/start"))             defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли");         else if (requestMessage.getText().equals("/idea"))             onIdea(response);         else             defaultMsg(response, "Я записал вашу мысль :) \n ");          log.info("Working, text[{}]", requestMessage.getText());          if (requestMessage.getText().startsWith("/")) {             entity.setStartWord("команда: ");             producerService.sendMessage( entity);         } else {             entity.setStartWord("мысль: ");             producerService.sendMessage( entity);             userService.insert(entity);         }     }      /**      * Метод отправки сообщения со списком мыслей - по команде "/idea"      *      * @param response - метод обработки сообщения      */     private void onIdea(SendMessage response) throws TelegramApiException {         if (userService.getUserList().isEmpty()) {             defaultMsg(response, "В списке нет мыслей. \n");         } else {             defaultMsg(response, "Вот список ваших мыслей: \n");             for (User txt : userService.getUserList()) {                 response.setText(txt.toString());                 execute(response);             }         }     }      /**      * Шабонный метод отправки сообщения пользователю      *      * @param response - метод обработки сообщения      * @param msg - сообщение      */     private void defaultMsg(SendMessage response, String msg) throws TelegramApiException {         response.setText(msg);         execute(response);     } }
Фрагмент кода с отправкой в Kafka и записью в БД
        if (requestMessage.getText().startsWith("/")) {             entity.setStartWord("команда: ");             producerService.sendMessage( entity);         } else {             entity.setStartWord("мысль: ");             producerService.sendMessage( entity);             userService.insert(entity);         }

Переходим к созданию бизнес логики приложения

— BaseService — реализует базовые методы сервисов проекта

BaseService
public class BaseService {      /**      * Обёртка результата      *      * @param result результат      * @return результат      * @throws NotFoundException если результат null      */     public <T> T wrapResult(T result) {         if(result == null)             throw new NotFoundException();         return result;     }      /**      * Обёртка результата      *      * @param result результат      * @return результат      * @throws NotFoundException если результат null или пустой      */     public <T> List<T> wrapResults(List<T> result) {         if(result == null || result.size() == 0)             throw new NotFoundException();         return result;     }  }

— Класс UserService работает с нашим репозиторием IUserRepository и содержит в себе бизнес-логику работы с записями о событиях в БД

UserService
@Service @Slf4j @RequiredArgsConstructor public class UserService extends BaseService {      //beans     protected final IUserRepository repo;      /**      * Возвращает список записей      *      * @return список записей      * @throws DbException в случае ошибки БД      */     public List<User> getUserList() {         log.trace("#### getUserList() - working");         return wrapResults(repo.getUserList());     }      /**      * Возвращает список записей по id      *      * @throws DbException в случае ошибки БД      */     public User getById(int id) {         log.trace("#### getById() [id={}]", id);         return wrapResult(repo.getById(id));     }      /**      * Вставка новой записи      *      * @param entity новая запись      * @throws DbException в случае ошибки БД      */     public void insert(User entity) {         log.trace("#### insert() [entity={}]", entity);         repo.insert(entity);     }      /**      * Удаление записи      *      * @param entity удаляемая запись      * @throws DbException в случае ошибки БД      */     public void delete(User entity) {         log.trace("#### delete() [entity={}]", entity);         repo.delete(entity);     }  }

— Класс Producer, как раз тот класс, который шлет сообщения в топик users, а так же здесь мы можем изменять формат самого сообщения и данные, которые он отправляет

Producer
@Service @Slf4j public class Producer {      private static final String TOPIC = "users";     protected final IUserRepository repo;      @Autowired     private KafkaTemplate<String, String> kafkaTemplate;      public Producer(IUserRepository repo) {         this.repo = repo;     }      public void sendMessage(User user) {         if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message");         log.info("#### Producing message [user={}]", user);         kafkaTemplate.send(TOPIC, "Writing in log -> " + user);     } }

В конце класс, который собственно и запускает все наше приложене

WebHookApp
@Slf4j @SpringBootApplication public class WebHookApp {     public static void main(String[] args) {         SpringApplication.run(WebHookApp.class, args);     } }

Теперь мы замариновали все ингридиенты и подготовили блюдо к запеканию:

— Сначала проверим, запущена ли Kafka

запуск по команде - sudo su systemctl start kafka
запуск по команде — sudo su systemctl start kafka

— После, запускаем Conductor и видим, что у нас работет брокер сообщений, после запуска нашего приложения, тут появится топик users, в который будут лететь сообщения отправленные нашим producer

Запущенный брокер
Запущенный брокер

— Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:

CREATE TABLE public.log ( id serial4 NOT NULL, message varchar(500) NOT NULL, date_time date NOT NULL, topic varchar(100) NOT NULL, CONSTRAINT log_pkey PRIMARY KEY (id) );
CREATE TABLE public.user_table ( id serial4 NOT NULL, user_name varchar(100) NOT NULL, description varchar(500) NULL, CONSTRAINT user_table_pkey PRIMARY KEY (id) );
Схема БД public
Схема БД public
Вот как выглядят таблица log
Вот как выглядят таблица log
Вот как выглядит таблица user_table
Вот как выглядит таблица user_table

Отлично, блюдо запеклось и готово к подаче:

— Запускаем проект, проверяем, что все настроено и корректно работает

Spring logs
Spring logs

— Открываем телеграмм и пробуем на вкус нашего «Франкенштейна»

  • Пишем — /start и начинаем тест … Я в шоке, оно живое !

Общение с ботом в Телеграмм
Общение с ботом в Телеграмм

— Давайте посмотрим, что же нам написал Spring в логах и записались ли данные в Kafka и БД ?

Логи нашего бота, ошибок не наблюдается
  .   ____          _            __ _ _  /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \  \\/  ___)| |_)| | | | | || (_| |  ) ) ) )   '  |____| .__|_| |_|_| |_\__, | / / / /  =========|_|==============|___/=/_/_/_/  :: Spring Boot ::                (v2.4.2) 2022-01-15 16:46:19.248  INFO 412498 --- [           main] com.secretary.bot.WebHookApp             : The following profiles are active: bot 2022-01-15 16:46:19.291  WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder    : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath 2022-01-15 16:46:19.882  INFO 412498 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http) 2022-01-15 16:46:19.887  INFO 412498 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat] 2022-01-15 16:46:19.887  INFO 412498 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41] 2022-01-15 16:46:19.956  INFO 412498 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext 2022-01-15 16:46:19.957  INFO 412498 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms 2022-01-15 16:46:20.013  INFO 412498 --- [           main] c.secretary.bot.config.DefaultDbConfig   : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}] 2022-01-15 16:46:20.565  INFO 412498 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor' 2022-01-15 16:46:20.574 DEBUG 412498 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice 2022-01-15 16:46:20.598 DEBUG 412498 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping' 2022-01-15 16:46:20.619 DEBUG 412498 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Patterns [/webjars/**, /**] in 'resourceHandlerMapping' 2022-01-15 16:46:20.627 DEBUG 412498 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice 2022-01-15 16:46:20.702  INFO 412498 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path '' 2022-01-15 16:46:20.709  INFO 412498 --- [           main] com.secretary.bot.WebHookApp             : Started WebHookApp in 1.65 seconds (JVM running for 1.962) SSS2022-01-15 16:52:33.916  INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:  acks = 1 batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = true key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer  2022-01-15 16:52:33.947  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0 2022-01-15 16:52:33.948  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651 2022-01-15 16:52:33.948  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1642254753947 2022-01-15 16:52:34.056  INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw 2022-01-15 16:54:01.115  INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting... 2022-01-15 16:54:01.188  INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed. 

— Как мы видим, сообщения отправленные Боту появились в БД

Записи в БД
Записи в БД

— Открыв кондуктор, перейдите во вкладку topics, после нажимаем на наш топик users

Вкладка topics
Вкладка topics

— Далее во вкладке нашего топика нажимаем на кнопку CONSUME DATA

Информация о топике users
Информация о топике users

— В открывшемся окне, ставим такие же настройки (самая важная из них это Start From — указывает, с какого момента показывать сообщения в Kafka, наша настройка — показывает все сообщения, включая отправленые ранее)

Настройки просмотра сообщений
Настройки просмотра сообщений

— Вот и все, теперь мы убедились, что сообщения благополучно прилетели в Kafka, записались в БД и не вызвали ошибок в приложении

Прилетевшие в Kafka сообщения
Прилетевшие в Kafka сообщения

Ну что же, большое всем спасибо за время, потраченное на прочтение данной статьи, жду вас во второй части этого туториала, где мы используем Consumer Kafka, с помощью которого будем обрабатывать прилетающие сообщения.


ссылка на оригинал статьи https://habr.com/ru/post/655329/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *