Apache Kafka – Producer и Consumer. Простой пример Nodejs приложения

от автора

Привет! В продолжение темы изучения микросервисов решил разобраться с взаимодействием этих самых «сервисов», и написать простой пример взаимодействия двух сервисов между собой.

Перед чтением данной статьи, настоятельно рекомендую ознакомиться с данной статьей, по теме kafka (Kafka за 20 минут. Ментальная модель и как с ней работать)

Пример реализации можно найти тут…

И так, пример таков:

В сервисе пользователей есть метод для регистрации этих самых пользователей, где после регистрации, необходимо создать профиль для пользователя.

Конечно в представленном примере, отсутствует логика «ответов на вопросы» по типу:

  • Что если наш сервис недоступен?

  • Что если брокер недоступен?

  • … и прочие

Пример направлен исключительно на демонстрацию работы общения двух сервисов между собой, с помощью Apache Kafka.

Реализация

И так, в данном примере user‑service выступает в качестве producer — то есть отправителем событий, а profile‑service — выступает в качестве consumer — то есть слушающего входящие события.

Создадим 2 абсолютно одинаковых сервиса со следующими файлами

  1. Создадим директорию microservices — куда поместим наши сервисы

  2. Создадим файлы для сервиса профиля:
    Следует поместить их в отдельную директорию profile

    profile.service.js
    import Fastify from 'fastify'  const fastify = Fastify({     logger: true, })  fastify.listen({ port: 3001, host: "0.0.0.0" }, (err, address) => {     if (err) throw err     console.log("Profile service: Start Up!") })

    Dockerfile
    FROM node:22  WORKDIR /profile-microservice  COPY package.json . COPY yarn.lock .  RUN yarn install  COPY . .  EXPOSE 3001  CMD ["node", "profile.service.js"]

    package.json
    {   "name": "microservice-kafka-learn_profile",   "version": "1.0.0",   "license": "MIT",   "type": "module",   "dependencies": {     "@fastify/kafka": "^3.0.0",     "fastify": "^5.0.0"   } }

  3. Далее по тому же принципу что и для сервиса profile, создадим директорию и файловую структуру для сервиса user

Далее, в корне наше проекта создадим docker‑compose в котором подымим наши ранее созданный сервисы, а так же, сразу поместим туда нашу Apache Kafka

version: "3.8"  services:   zookeeper:     image: confluentinc/cp-zookeeper:latest     container_name: zookeeper     environment:       ZOOKEEPER_CLIENT_PORT: 2181       ZOOKEEPER_TICK_TIME: 2000    kafka_broker:     image: confluentinc/cp-kafka:latest     container_name: kafka     depends_on:       - zookeeper     ports:       - "9092:9092"     environment:       KAFKA_BROKER_ID: 1       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       KAFKA_LISTENERS: PLAINTEXT://kafka:9092       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1    user_microservice:     build: "./microservices/user"     ports:       - "3000:3000"     depends_on:       - kafka_broker    profile_microservice:     build: "./microservices/profile"     ports:       - "3001:3001"     depends_on:       - kafka_broker

На данном этапе, запуск docker‑compose должен привести вас к тому, что ваши сервисы user, profile, а так же kafka и zookeeper должны корректно работать.

Сценарий: «Пользователь успешно зарегистрирован»

Следующий шаг, давайте добавим в наш user.service.js файл подключение к Kafka

// ...  const fastify = Fastify({     logger: true, })  fastify.register(kafka, {     producer: {         'metadata.broker.list': 'kafka:9092',         'fetch.wait.max.ms': 10,         'fetch.error.backoff.ms': 50,         'client.id': 'user-service',         'dr_cb': true     }, });  // ...

Мы успешно подключились к нашей kafka, теперь давайте добавим метод «регистрации» пользователей

fastify.post('/user/register', async (request, reply) => {     // .....логика регистрации пользователя.....     // Считаем что тут у нас логика создания пользователя, которая прошла успешно     // ...      /*     * Отправьте событие для создания профиля для успешно зарегистрированного пользователя.     * */         fastify.kafka.push({         topic: "user_register",         payload: JSON.stringify({             id: Date.now(),             email: "user@example.com",             username: "imbatman"         }),         key: 'user_register_key'     })      reply.send({         message: "User successfully created!"     }) })

Что тут происходит?

  1. В данном коде, мы считаем что успешно зарегистрировали пользователя и приступаем к отправке сообщения в kafka

  2. Создаем название topic‘а, желательно вынести данные наименования топиков в константы, для дальнейшего использования их, к примеру, в других сервисах, как мы увидим в примере далее

  3. Формируем payload который хотим передать потенциальному получателю нашего события

  4. Формируем ключ нашего сообщения

  5. Отправляем!

  6. Отлично, ваши данные попали к брокеру!

Сценарий: «Создаем профиль пользователя»

Добавим слушателя нашего события для создания профиля в сервисе profile.service.js

Нам так же как и в сервисе user, необходимо подключиться к нашей Kafka в сервисе profile

import crypto from "node:crypto"  const groupId = crypto.randomBytes(20).toString('hex')  fastify.register(kafka, {     consumer: {         'metadata.broker.list': 'kafka:9092',         'fetch.wait.max.ms': 10,         'fetch.error.backoff.ms': 50,         'topic.metadata.refresh.interval.ms': 1000,         'group.id': groupId,     }, })

Перед подключением нам нужно сгенерировать идентификатор группы, он служит для идентификации группы потребителей (consumer). Все потребители с одинаковым group.id образуют одну группу и совместно потребляют сообщения из топиков, разделяя между собой партиции (Если вы прочли рекомендованную мной статью выше, понимаете о чем речь).

Далее, нам необходимо подписаться на наше событие регистрации пользователей, то есть — «user_register»

Добавим следующий код:

fastify.register(kafka, {     consumer: {         'metadata.broker.list': 'kafka:9092',         'fetch.wait.max.ms': 10,         'fetch.error.backoff.ms': 50,         'topic.metadata.refresh.interval.ms': 1000,         'group.id': groupId,     }, }).after((err) => {     if (err) throw err      fastify.kafka         .subscribe(["user_register"])         .on("user_register", (msg, commit) => {             const registeredUser = JSON.parse(msg.value.toString());              console.log(registeredUser) // Тут наш зарегистрированный юзверь             commit()         })      fastify.kafka.consume() })

Разберем что тут происходит?

  1. Для начала, мы подписываемся в нашем сервисе на все необходимые для данного сервиса события .subscribe(["user_register"]). В данном случае, этим событием является «user_register»

    Опять же, повторюсь, что ключи топиков, необходимо хранить вне всех сервисов, для их общего доступа. В данном случае, сделано ради простоты примера.

  2. Далее, мы устанавливаем обработчик события
    .on("user_register", (msg, commit) => {

  3. В теле данного события мы получаем результат отправленного нами события в сервисе user, то есть:

    {   id: "1726957487909",   email: "user@example.com",   username: "imbatman" }
  1. Нам остается обработать в нашем profile сервисе данное событие, и создать профиль пользователя, основываясь на предоставленных данных

  2. Далее, вызывая представленную в аргументе функцию commit, мы подтверждает получение и обработку сообщения Kafka, сигнализируя брокеру, что это сообщение может быть помечено как «прочитанное» в текущей группе потребителей, предотвращая его повторное получение

fastify.kafka.consume() в свою очередь, активирует процесс потребления сообщений из подписанных тем, в данном случае «user_register». После вызова этого метода, Fastify начнет обрабатывать входящие сообщения и передавать их в соответствующие обработчики событий, такие как on("user_register", ...)

Заключение

Надеюсь, на таком незамысловатом примере, мне в первую очередь, как человеку только что познакомившись с данной технологией, и вам, как читателям данной статьи, удалось уловить первые простые шаги и понимание взаимодействие субъектов в виде сервисов (как в данном примере) между собой!


ссылка на оригинал статьи https://habr.com/ru/articles/845118/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *