Событийно-ориентированные приложения с использованием Kafka и Python

от автора

В этой статье мы разработаем и реализуем событийно-ориентированное приложение с использованием Kafka в Python. Для примера мы возьмем заказ мебели в приложении типа IKEA. Это просто пример, а не то, что происходит на самом деле в IKEA.

Мы будем делать приложение на локальном компьютере, но для производственных сред вы можете использовать облачный провайдер, такой как AWS, GCP или Azure.

Давайте посмотрим, что у нас есть в приведенной выше архитектуре:

Frontend (внешний интерфейс). Это может быть мобильное или веб-приложение, где пользователь заказывает товар. Когда пользователь выбирает и заказывает мебель с помощью приложения, внешний интерфейс обращается к бэкэнду.

Orders Backend (бэкэнд заказов). Он принимает заказ из внешнего интерфейса со всеми данными, связанными с этим заказом, а затем записывает в Kafka тему под названием «order_details». Тема «order_details» будет содержать всю информацию, относящуюся к одному отдельному заказу. Это будет простой Python-файл. Вы можете задеплоить этот сервис вместе с остальными, как микросервисы в облако, используя, например, облачный запуск на GCP или Lambda на AWS.

Transactions Backend (бэкэнд транзакций). Подписывается на тему «order_details» в Kafka, поэтому всякий раз, когда кто-то пишет в тему, бэкэнд транзакций будет читать сообщение и обрабатывать его в режиме реального времени. Бэкэнд транзакций будет выполнять обработку кредитных карт и некоторые другие проверки, чтобы убедиться, что заказ подтвержден. Как только заказ будет подтвержден, он отправит ответ в другую тему Kafka под названием «order_confirmed». Эта тема нужна, чтобы собирать все данные, которые относятся к подтвержденному заказу.

Email Backend (бэкэнд электронной почты). Подписывается на тему «order_confirmed» и отправляет пользователю электронное письмо с подтверждением, когда заказ подтвержден. Он также может отправить сообщение в тему, например «order_email_sent».

Analytics Backend (бэкэнд аналитики). Он подписывается на тему «order_confirmed» и выполняет по ней какую-то аналитику. Например, он может агрегировать общее количество заказов в этот день и общее количество доходов, полученных от разных заказов. Затем мы можем отправить результат аналитики по теме «analytics_result».

Dashboard. У нас может быть служба для получения каких-либо данных из разных тем и отправки их на панель инструментов для визуализации. Здесь мы просто используем один сервис для них обоих в Python для простоты, но вы можете легко их разделить.

В этом посте нам понадобятся следующее:

kafka-python flask

Чтобы запустить Kafka локально, можно использовать следующий компоновочный файл с кластером Kafka с одним брокером, а также одним zookeeper’ом и некоторыми другими компонентами Kafka, такими как центр управления UI (UI control-center), реестр схем (schema-registry) и т.д.

## docker-compose-kafka.yml  version: "3"  services:   zookeeper:     image: confluentinc/cp-zookeeper:5.4.0     hostname: zookeeper     container_name: zookeeper     ports:       - "2181:2181"     environment:       ZOOKEEPER_CLIENT_PORT: 2181       ZOOKEEPER_TICK_TIME: 2000    broker:     image: confluentinc/cp-server:5.4.0     hostname: broker     container_name: broker     depends_on:       - zookeeper     ports:       - "9092:9092"       - "29093:29093"     environment:       KAFKA_BROKER_ID: 1       KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092       KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1       KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0       KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1       CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092       CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181       CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1       CONFLUENT_METRICS_ENABLE: "true"       CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"    kafka-tools:     image: confluentinc/cp-kafka:5.4.0     hostname: kafka-tools     container_name: kafka-tools     command: ["tail", "-f", "/dev/null"]     network_mode: "host"    schema-registry:     image: confluentinc/cp-schema-registry:5.4.0     hostname: schema-registry     container_name: schema-registry     depends_on:       - zookeeper       - broker     ports:       - "8081:8081"     environment:       SCHEMA_REGISTRY_HOST_NAME: schema-registry       SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"    control-center:     image: confluentinc/cp-enterprise-control-center:5.4.0     hostname: control-center     container_name: control-center     depends_on:       - zookeeper       - broker       - schema-registry     ports:       - "9021:9021"     environment:       CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'       CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'       CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"       CONTROL_CENTER_REPLICATION_FACTOR: 1       CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1       CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1       CONFLUENT_METRICS_TOPIC_REPLICATION: 1       PORT: 9021

Также можно использовать Kafka-UI и Conduktor. UI-центр управления просто показывает сообщения, которые отправляются, когда UI-страница темы открыта. Для Kafka-UI вы можете добавить следующие коды в свой файл компоновки вместо части центра управления:

kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8080:8080" restart: always environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=broker:29092 - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181 - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT - KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://schema-registry:8081

Затем нам нужно запустить docker-compose -f docker-compose-kafka.yml up -d, чтобы запустить Kafka со всеми компонентами. Обратите внимание, что Kafka должна быть запущена, когда мы хотим протестировать внешние и внутренние службы, которым необходимо отправлять или получать данные из тем Kafka.

Мы можем проверить, все ли работает, с помощью docker-compose -f docker-compose-kafka.yml ps команды:

NAME COMMAND SERVICE STATUS PORTS broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:29093->29093/tcp kafka-tools "tail -f /dev/null" kafka-tools running kafka-ui "/bin/sh -c 'java $J…" kafka-ui running 0.0.0.0:8080->8080/tcp schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

В дополнение к вышеупомянутому файлу docker-compose для Kafka у нас будет еще один для наших микросервисов. Вы можете добавлять сервисы один за другим в файл docker-compose и тестировать их.

Обратите внимание, что нужно для подключения ваших producers и consumers к брокеру Kafka: 

  • Если вы используете ту же сеть, поместив микросервис в тот же файл docker-compose, что и Kafka. Или используя отдельный файл docker-compose и настроив сеть, как сеть Kafka, вы можете использовать broker:29092.

  • Если вы запускаете свой сервис локально на том же компьютере без его докеризации, вы можете использовать в своем коде его localhost:9092.

  • Если вы хотите запустить Kafka на одной машине, а свои службы на другой машине, вам нужно использовать в своем коде <kafka machine ip>:29093.

Начнем с backend-сервисов. Интерфейс мы добавим позже.

Бэкэнд заказов

Теперь давайте перейдем к backend’у заказов. Мы докеризуем приложение и помещаем его в отдельный файл docker-compose с именем docker-compose-services.yml и настраиваем сеть так же, как сеть Kafka.  orders_backend.py — это flask-приложение и выглядит оно следующим образом:

# orders_backend.py  import json import time  from kafka import KafkaProducer from flask import Flask, jsonify, request  ORDER_KAFKA_TOPIC = 'order_details' # KAFKA_SERVER_ADDRESS = 'localhost:9092' KAFKA_SERVER_ADDRESS = 'broker:29092' # KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`  app = Flask(__name__)  ## from inside docker compose network - when add the service to compose file -> orders_backend:v1 producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",                               value_serializer=lambda x: json.dumps(x).encode('utf-8'))  # post endpoint to get user id , order id, user email, and order details @app.route('/order', methods=['POST']) def order():     user_id = request.json['user_id']     order_id = request.json['order_id']     user_email = request.json['user_email']     order_details = request.json['order_details']     order = {}     order['user_id'] = user_id     order['order_id'] = order_id     order['user_email'] = user_email     order['order_details'] = order_details     order['time'] = time.time()     producer.send(ORDER_KAFKA_TOPIC, order)     print("Sent order details {} to kafka topic: {}".format(order, ORDER_KAFKA_TOPIC))     return jsonify(order)   if __name__ == '__main__':     app.run(host="0.0.0.0", port=5002, debug=True)

У него есть конечная точка публикации для получения заказа и публикации его в теме Kafka с именем «order_details».

Затем вы можете легко докеризовать этот сервис. Вот докерфайл:

FROM python:3.9.7-slim RUN pip install -U pip RUN pip install pipenv WORKDIR /app COPY [ "Pipfile", "Pipfile.lock", "./" ] RUN pipenv install - system - deploy COPY [ "orders_backend.py", "./" ] EXPOSE 5002 ENTRYPOINT ["python", "orders_backend.py"]

Затем мы можем создать образ, используя:

docker build -t orders_backend:v1 .

Файл docker-compose будет выглядеть следующим образом:

# docker-compose-services.yml  version: "1"  services:   orders_backend:     restart: always     image: orders_backend:v1     ports:       - "5002:5002"     networks:       - ikea-ordering-kafka_default  networks:   ikea-ordering-kafka_default:     external: true

Мы можем использовать postman для проверки:

Мы также можем видеть сообщения по теме в пользовательском интерфейсе:

Бэкэнд транзакций

Это простой сервис для прослушивания темы «order_details», выполнения какой-либо обработки данных и отправки подтвержденного сообщения в тему «order_confirmed»:

# transactions_backend.py  import json import time from kafka import KafkaConsumer, KafkaProducer  OERDER_KAFKA_TOPIC = 'order_details' ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed' # KAFKA_SERVER_ADDRESS = 'localhost:9092' KAFKA_SERVER_ADDRESS = 'broker:29092' # KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`  consumer = KafkaConsumer(OERDER_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",                             value_deserializer=lambda x: json.loads(x.decode('utf-8'))) producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))  while True:     for message in consumer:         print("Received order details: {}".format(message.value))         user_id = message.value['user_id']         order_id = message.value['order_id']         user_email = message.value['user_email']         order_details = message.value['order_details']         time = message.value['time']         ## do some suff on the order and check the confirmation         order_confirmed = {}         order_confirmed['user_id'] = user_id         order_confirmed['order_id'] = order_id         order_confirmed['user_email'] = user_email         order_confirmed['order_details'] = order_details         order_confirmed['time'] = time         order_confirmed['status'] = 'confirmed'         producer.send(ORDER_CONFIRMED_KAFKA_TOPIC, order_confirmed)         print("Sent order details {} to kafka topic: {}".format(order_confirmed, ORDER_CONFIRMED_KAFKA_TOPIC))

Докер-файл выглядит следующим образом:

FROM python:3.9.7-slim RUN pip install -U pip RUN pip install pipenv WORKDIR /app COPY [ "Pipfile", "Pipfile.lock", "./" ] RUN pipenv install - system - deploy COPY [ "transactions_backend.py", "./" ] ENTRYPOINT ["python", "transactions_backend.py"]

Вы можете создать этот образ и обновить docker-compose-services.yml файл:

# docker-compose-services.yml  version: "1"  services:   orders_backend:     restart: always     image: orders_backend:v1     ports:       - "5002:5002"     networks:       - ikea-ordering-kafka_default      transactions_backend:     restart: always     image: transactions_backend:v1     ports:       - "5003:5003"     networks:       - ikea-ordering-kafka_default  networks:   ikea-ordering-kafka_default:     external: true

Затем, ещё раз протестировав сервисы с помощью postman, мы можем увидеть сообщения, приходящие в тему:

Бэкэнд электронной почты

Код этой службы выглядит следующим образом:

# email_backend.py  import json import time  from kafka import KafkaConsumer, KafkaProducer # from flask import Flask, jsonify, request  ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed' EMAIL_SENT_KAFKA_TOPIC = 'order_email_sent' # KAFKA_SERVER_ADDRESS = 'localhost:9092' KAFKA_SERVER_ADDRESS = 'broker:29092' # KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`  producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))  consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",                             value_deserializer=lambda x: json.loads(x.decode('utf-8')))   def send_email(user_id, order_id, user_email, order_details, time, status):     print("Sending email to user: {} with order details: {}".format(user_email, order_details))     # send email to user     # ...     # ...     # ...     # ...        return True   while True:     for message in consumer:         # read data from consumer and call the send_email() function         print("Received order details: {}".format(message.value))         user_id = message.value['user_id']         order_id = message.value['order_id']         user_email = message.value['user_email']         order_details = message.value['order_details']         time = message.value['time']         status = message.value['status']         email_send_status = send_email(user_id, order_id, user_email, order_details, time, status)         email_sent = {}         email_sent['user_id'] = user_id         email_sent['order_id'] = order_id         email_sent['user_email'] = user_email         email_sent['order_details'] = order_details         email_sent['time'] = time         email_sent['status'] = email_send_status         producer.send(EMAIL_SENT_KAFKA_TOPIC, email_sent)         print("Sent email details {} to kafka topic: {}".format(email_sent, EMAIL_SENT_KAFKA_TOPIC))

Докер-файл также похож на предыдущие с небольшими изменениями имени python-файла. Затем вы можете создать образ, обновить файл docker-compose и запустить его. После отправки нескольких новых сообщений через postman мы можем увидеть сообщения в теме в пользовательском интерфейсе:

Бэкэнд аналитики

Код следующего сервиса получает подтвержденный заказ и рассчитывает общее количество заказов и общий доход:

# analytics_backend.py  import json import time  from kafka import KafkaConsumer, KafkaProducer  ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed' ANALYTICS_KAFKA_TOPIC = 'analytics_result' # KAFKA_SERVER_ADDRESS = 'localhost:9092' KAFKA_SERVER_ADDRESS = 'broker:29092' # KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`  producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))  consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",                             value_deserializer=lambda x: json.loads(x.decode('utf-8')))  total_revenue = 0 total_orders_count = 0  while True:     for message in consumer:         # read data from consumer and do some analytics on it         print("Received order details: {}".format(message.value))         order_details = message.value['order_details']         total_revenue += int(order_details['price'])         total_orders_count += 1         analytics = {}         analytics['total_revenue'] = total_revenue         analytics['total_orders_count'] = total_orders_count         producer.send(ANALYTICS_KAFKA_TOPIC, analytics)         print("Sent analytics details {} to kafka topic: {}".format(analytics, ANALYTICS_KAFKA_TOPIC))

Докер-файл снова точно такой же, как и предыдущий, но с небольшой модификацией. Создайте образ, обновите файл компоновки и, наконец, запустите его.

Мы можем отправить несколько новых сообщений через postman и увидеть сообщения в теме в пользовательском интерфейсе:

Вот и все на этот раз. Надеемся, вы получили общее представление о том, как использовать Kafka в своих проектах.

https://slurm.club/3Musuk6

А чтобы еще больше узнать о том, как разработчикам можно использовать Kafka в работе, вы можете прийти на наш курс «Apache Kafka для разработчиков». Это углублённый курс с практикой на Java или Golang и платформой Spring+Docker+Postgres, который переведёт вас на новый уровень владения инструментом.


ссылка на оригинал статьи https://habr.com/ru/companies/southbridge/articles/735262/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *