В этой статье мы разработаем и реализуем событийно-ориентированное приложение с использованием Kafka в Python. Для примера мы возьмем заказ мебели в приложении типа IKEA. Это просто пример, а не то, что происходит на самом деле в IKEA.
Мы будем делать приложение на локальном компьютере, но для производственных сред вы можете использовать облачный провайдер, такой как AWS, GCP или Azure.

Давайте посмотрим, что у нас есть в приведенной выше архитектуре:
— Frontend (внешний интерфейс). Это может быть мобильное или веб-приложение, где пользователь заказывает товар. Когда пользователь выбирает и заказывает мебель с помощью приложения, внешний интерфейс обращается к бэкэнду.
— Orders Backend (бэкэнд заказов). Он принимает заказ из внешнего интерфейса со всеми данными, связанными с этим заказом, а затем записывает в Kafka тему под названием «order_details». Тема «order_details» будет содержать всю информацию, относящуюся к одному отдельному заказу. Это будет простой Python-файл. Вы можете задеплоить этот сервис вместе с остальными, как микросервисы в облако, используя, например, облачный запуск на GCP или Lambda на AWS.
— Transactions Backend (бэкэнд транзакций). Подписывается на тему «order_details» в Kafka, поэтому всякий раз, когда кто-то пишет в тему, бэкэнд транзакций будет читать сообщение и обрабатывать его в режиме реального времени. Бэкэнд транзакций будет выполнять обработку кредитных карт и некоторые другие проверки, чтобы убедиться, что заказ подтвержден. Как только заказ будет подтвержден, он отправит ответ в другую тему Kafka под названием «order_confirmed». Эта тема нужна, чтобы собирать все данные, которые относятся к подтвержденному заказу.
— Email Backend (бэкэнд электронной почты). Подписывается на тему «order_confirmed» и отправляет пользователю электронное письмо с подтверждением, когда заказ подтвержден. Он также может отправить сообщение в тему, например «order_email_sent».
— Analytics Backend (бэкэнд аналитики). Он подписывается на тему «order_confirmed» и выполняет по ней какую-то аналитику. Например, он может агрегировать общее количество заказов в этот день и общее количество доходов, полученных от разных заказов. Затем мы можем отправить результат аналитики по теме «analytics_result».
— Dashboard. У нас может быть служба для получения каких-либо данных из разных тем и отправки их на панель инструментов для визуализации. Здесь мы просто используем один сервис для них обоих в Python для простоты, но вы можете легко их разделить.
В этом посте нам понадобятся следующее:
kafka-python flask
Чтобы запустить Kafka локально, можно использовать следующий компоновочный файл с кластером Kafka с одним брокером, а также одним zookeeper’ом и некоторыми другими компонентами Kafka, такими как центр управления UI (UI control-center), реестр схем (schema-registry) и т.д.
## docker-compose-kafka.yml version: "3" services: zookeeper: image: confluentinc/cp-zookeeper:5.4.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:5.4.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "29093:29093" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: "true" CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous" kafka-tools: image: confluentinc/cp-kafka:5.4.0 hostname: kafka-tools container_name: kafka-tools command: ["tail", "-f", "/dev/null"] network_mode: "host" schema-registry: image: confluentinc/cp-schema-registry:5.4.0 hostname: schema-registry container_name: schema-registry depends_on: - zookeeper - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181" control-center: image: confluentinc/cp-enterprise-control-center:5.4.0 hostname: control-center container_name: control-center depends_on: - zookeeper - broker - schema-registry ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181' CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021
Также можно использовать Kafka-UI и Conduktor. UI-центр управления просто показывает сообщения, которые отправляются, когда UI-страница темы открыта. Для Kafka-UI вы можете добавить следующие коды в свой файл компоновки вместо части центра управления:
kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8080:8080" restart: always environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=broker:29092 - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181 - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT - KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://schema-registry:8081
Затем нам нужно запустить docker-compose -f docker-compose-kafka.yml up -d, чтобы запустить Kafka со всеми компонентами. Обратите внимание, что Kafka должна быть запущена, когда мы хотим протестировать внешние и внутренние службы, которым необходимо отправлять или получать данные из тем Kafka.
Мы можем проверить, все ли работает, с помощью docker-compose -f docker-compose-kafka.yml ps команды:
NAME COMMAND SERVICE STATUS PORTS broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:29093->29093/tcp kafka-tools "tail -f /dev/null" kafka-tools running kafka-ui "/bin/sh -c 'java $J…" kafka-ui running 0.0.0.0:8080->8080/tcp schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
В дополнение к вышеупомянутому файлу docker-compose для Kafka у нас будет еще один для наших микросервисов. Вы можете добавлять сервисы один за другим в файл docker-compose и тестировать их.
Обратите внимание, что нужно для подключения ваших producers и consumers к брокеру Kafka:
-
Если вы используете ту же сеть, поместив микросервис в тот же файл docker-compose, что и Kafka. Или используя отдельный файл docker-compose и настроив сеть, как сеть Kafka, вы можете использовать
broker:29092. -
Если вы запускаете свой сервис локально на том же компьютере без его докеризации, вы можете использовать в своем коде его
localhost:9092. -
Если вы хотите запустить Kafka на одной машине, а свои службы на другой машине, вам нужно использовать в своем коде
<kafka machine ip>:29093.
Начнем с backend-сервисов. Интерфейс мы добавим позже.
Бэкэнд заказов
Теперь давайте перейдем к backend’у заказов. Мы докеризуем приложение и помещаем его в отдельный файл docker-compose с именем docker-compose-services.yml и настраиваем сеть так же, как сеть Kafka. orders_backend.py — это flask-приложение и выглядит оно следующим образом:
# orders_backend.py import json import time from kafka import KafkaProducer from flask import Flask, jsonify, request ORDER_KAFKA_TOPIC = 'order_details' # KAFKA_SERVER_ADDRESS = 'localhost:9092' KAFKA_SERVER_ADDRESS = 'broker:29092' # KAFKA_SERVER_ADDRESS = '47.93.191.241:29093` app = Flask(__name__) ## from inside docker compose network - when add the service to compose file -> orders_backend:v1 producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT", value_serializer=lambda x: json.dumps(x).encode('utf-8')) # post endpoint to get user id , order id, user email, and order details @app.route('/order', methods=['POST']) def order(): user_id = request.json['user_id'] order_id = request.json['order_id'] user_email = request.json['user_email'] order_details = request.json['order_details'] order = {} order['user_id'] = user_id order['order_id'] = order_id order['user_email'] = user_email order['order_details'] = order_details order['time'] = time.time() producer.send(ORDER_KAFKA_TOPIC, order) print("Sent order details {} to kafka topic: {}".format(order, ORDER_KAFKA_TOPIC)) return jsonify(order) if __name__ == '__main__': app.run(host="0.0.0.0", port=5002, debug=True)
У него есть конечная точка публикации для получения заказа и публикации его в теме Kafka с именем «order_details».
Затем вы можете легко докеризовать этот сервис. Вот докерфайл:
FROM python:3.9.7-slim RUN pip install -U pip RUN pip install pipenv WORKDIR /app COPY [ "Pipfile", "Pipfile.lock", "./" ] RUN pipenv install - system - deploy COPY [ "orders_backend.py", "./" ] EXPOSE 5002 ENTRYPOINT ["python", "orders_backend.py"]
Затем мы можем создать образ, используя:
docker build -t orders_backend:v1 .
Файл docker-compose будет выглядеть следующим образом:
# docker-compose-services.yml version: "1" services: orders_backend: restart: always image: orders_backend:v1 ports: - "5002:5002" networks: - ikea-ordering-kafka_default networks: ikea-ordering-kafka_default: external: true
Мы можем использовать postman для проверки:

Мы также можем видеть сообщения по теме в пользовательском интерфейсе:

Бэкэнд транзакций
Это простой сервис для прослушивания темы «order_details», выполнения какой-либо обработки данных и отправки подтвержденного сообщения в тему «order_confirmed»:
# transactions_backend.py import json import time from kafka import KafkaConsumer, KafkaProducer OERDER_KAFKA_TOPIC = 'order_details' ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed' # KAFKA_SERVER_ADDRESS = 'localhost:9092' KAFKA_SERVER_ADDRESS = 'broker:29092' # KAFKA_SERVER_ADDRESS = '47.93.191.241:29093` consumer = KafkaConsumer(OERDER_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT", value_deserializer=lambda x: json.loads(x.decode('utf-8'))) producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT", value_serializer=lambda x: json.dumps(x).encode('utf-8')) while True: for message in consumer: print("Received order details: {}".format(message.value)) user_id = message.value['user_id'] order_id = message.value['order_id'] user_email = message.value['user_email'] order_details = message.value['order_details'] time = message.value['time'] ## do some suff on the order and check the confirmation order_confirmed = {} order_confirmed['user_id'] = user_id order_confirmed['order_id'] = order_id order_confirmed['user_email'] = user_email order_confirmed['order_details'] = order_details order_confirmed['time'] = time order_confirmed['status'] = 'confirmed' producer.send(ORDER_CONFIRMED_KAFKA_TOPIC, order_confirmed) print("Sent order details {} to kafka topic: {}".format(order_confirmed, ORDER_CONFIRMED_KAFKA_TOPIC))
Докер-файл выглядит следующим образом:
FROM python:3.9.7-slim RUN pip install -U pip RUN pip install pipenv WORKDIR /app COPY [ "Pipfile", "Pipfile.lock", "./" ] RUN pipenv install - system - deploy COPY [ "transactions_backend.py", "./" ] ENTRYPOINT ["python", "transactions_backend.py"]
Вы можете создать этот образ и обновить docker-compose-services.yml файл:
# docker-compose-services.yml version: "1" services: orders_backend: restart: always image: orders_backend:v1 ports: - "5002:5002" networks: - ikea-ordering-kafka_default transactions_backend: restart: always image: transactions_backend:v1 ports: - "5003:5003" networks: - ikea-ordering-kafka_default networks: ikea-ordering-kafka_default: external: true
Затем, ещё раз протестировав сервисы с помощью postman, мы можем увидеть сообщения, приходящие в тему:

Бэкэнд электронной почты
Код этой службы выглядит следующим образом:
# email_backend.py import json import time from kafka import KafkaConsumer, KafkaProducer # from flask import Flask, jsonify, request ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed' EMAIL_SENT_KAFKA_TOPIC = 'order_email_sent' # KAFKA_SERVER_ADDRESS = 'localhost:9092' KAFKA_SERVER_ADDRESS = 'broker:29092' # KAFKA_SERVER_ADDRESS = '47.93.191.241:29093` producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT", value_serializer=lambda x: json.dumps(x).encode('utf-8')) consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT", value_deserializer=lambda x: json.loads(x.decode('utf-8'))) def send_email(user_id, order_id, user_email, order_details, time, status): print("Sending email to user: {} with order details: {}".format(user_email, order_details)) # send email to user # ... # ... # ... # ... return True while True: for message in consumer: # read data from consumer and call the send_email() function print("Received order details: {}".format(message.value)) user_id = message.value['user_id'] order_id = message.value['order_id'] user_email = message.value['user_email'] order_details = message.value['order_details'] time = message.value['time'] status = message.value['status'] email_send_status = send_email(user_id, order_id, user_email, order_details, time, status) email_sent = {} email_sent['user_id'] = user_id email_sent['order_id'] = order_id email_sent['user_email'] = user_email email_sent['order_details'] = order_details email_sent['time'] = time email_sent['status'] = email_send_status producer.send(EMAIL_SENT_KAFKA_TOPIC, email_sent) print("Sent email details {} to kafka topic: {}".format(email_sent, EMAIL_SENT_KAFKA_TOPIC))
Докер-файл также похож на предыдущие с небольшими изменениями имени python-файла. Затем вы можете создать образ, обновить файл docker-compose и запустить его. После отправки нескольких новых сообщений через postman мы можем увидеть сообщения в теме в пользовательском интерфейсе:

Бэкэнд аналитики
Код следующего сервиса получает подтвержденный заказ и рассчитывает общее количество заказов и общий доход:
# analytics_backend.py import json import time from kafka import KafkaConsumer, KafkaProducer ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed' ANALYTICS_KAFKA_TOPIC = 'analytics_result' # KAFKA_SERVER_ADDRESS = 'localhost:9092' KAFKA_SERVER_ADDRESS = 'broker:29092' # KAFKA_SERVER_ADDRESS = '47.93.191.241:29093` producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT", value_serializer=lambda x: json.dumps(x).encode('utf-8')) consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT", value_deserializer=lambda x: json.loads(x.decode('utf-8'))) total_revenue = 0 total_orders_count = 0 while True: for message in consumer: # read data from consumer and do some analytics on it print("Received order details: {}".format(message.value)) order_details = message.value['order_details'] total_revenue += int(order_details['price']) total_orders_count += 1 analytics = {} analytics['total_revenue'] = total_revenue analytics['total_orders_count'] = total_orders_count producer.send(ANALYTICS_KAFKA_TOPIC, analytics) print("Sent analytics details {} to kafka topic: {}".format(analytics, ANALYTICS_KAFKA_TOPIC))
Докер-файл снова точно такой же, как и предыдущий, но с небольшой модификацией. Создайте образ, обновите файл компоновки и, наконец, запустите его.
Мы можем отправить несколько новых сообщений через postman и увидеть сообщения в теме в пользовательском интерфейсе:

Вот и все на этот раз. Надеемся, вы получили общее представление о том, как использовать Kafka в своих проектах.
А чтобы еще больше узнать о том, как разработчикам можно использовать Kafka в работе, вы можете прийти на наш курс «Apache Kafka для разработчиков». Это углублённый курс с практикой на Java или Golang и платформой Spring+Docker+Postgres, который переведёт вас на новый уровень владения инструментом.
ссылка на оригинал статьи https://habr.com/ru/companies/southbridge/articles/735262/
Добавить комментарий