Привет, Хабр!
В этой статье рассмотрим, как на примере магазина котиков — кейса, где каждый заказ превращается в событие — создать событийно‑ориентированную систему обработки заказов с использованием Python, Kafka и Django REST Framework. Создадим REST API для приёма заказов, настроим Kafka‑продюсеры, консьюмеры и реализуем компенсационные транзакции по принципу Saga.
Почему событийно-ориентированная архитектура?
Прежде всего, декуплирование сервисов позволяет нам:
-
Разделить ответственность. Заказ, оплата, доставка — каждый модуль работает независимо, не мешая другим.
-
Масштабировать по необходимости. Если нагрузка растёт, можно масштабировать только ту часть, которая испытывает затруднения.
-
Повысить отказоустойчивость. Если один сервис падает, остальные продолжают работать, а сообщения хранятся в Kafka до обработки.
-
Облегчить интеграцию. Новые микросервисы легко подключаются через подписку на нужные топики.
Разберёмся, какие компоненты понадобятся:
-
Django REST Framework. Это «фасад» для клиентов, через который принимаются заказы.
-
Kafka. Надёжный брокер сообщений, который гарантирует, что данные передадутся между сервисами.
-
Микросервис заказов. Сюда входит логика приёма заказа, сохранение его в базе данных (или где угодно) и отправка события.
-
Сервисы‑подписчики. Они отвечают за обработку событий: от подтверждения оплаты до выполнения компенсационных транзакций.
Kafka
Kafka позволяет обрабатывать миллионы сообщений, хранить их до подтверждения и масштабироваться горизонтально.
Пример Kafka Producer на Python:
# kafka_producer.py import json from kafka import KafkaProducer import logging from typing import Any, Dict logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) def get_kafka_producer() -> KafkaProducer: """ Инициализируем KafkaProducer с базовыми настройками. В продакшене нужно добавить обработку ошибок подключения, настройки безопасности и т.д. """ try: producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'), retries=5, # Повторная отправка при сбоях linger_ms=10, # Небольшая задержка перед отправкой max_request_size=1048576 # Ограничение размера запроса (1MB) ) logger.info("KafkaProducer успешно инициализирован!") return producer except Exception as e: logger.error(f"Ошибка инициализации KafkaProducer: {e}") raise if __name__ == "__main__": producer = get_kafka_producer() order_event: Dict[str, Any] = { "order_id": 1001, "cat_id": 42, "quantity": 1, "customer_name": "Вася", "status": "CREATED" } try: future = producer.send('orders_topic', order_event) result = future.get(timeout=10) logger.info(f"Сообщение отправлено: топик {result.topic}, партиция {result.partition}") except Exception as e: logger.error(f"Ошибка отправки сообщения: {e}") finally: producer.flush()
Здесь понятно: создаём продюсера и отправляем сообщение. Конечно, для продакшена потребуется больше деталей — но это отличная отправная точка.
Django REST Framework
Теперь переходим к созданию микросервиса заказов. DRF‑приложение примет заказ, проведёт валидацию данных и отправит событие в Kafka.
Выполняем в терминале:
django-admin startproject catshop cd catshop python manage.py startapp orders
Создадим сериализатор заказа:
# orders/serializers.py from rest_framework import serializers class OrderSerializer(serializers.Serializer): cat_id = serializers.IntegerField(min_value=1) quantity = serializers.IntegerField(min_value=1) customer_name = serializers.CharField(max_length=255) def validate_cat_id(self, value: int) -> int: if value <= 0: raise serializers.ValidationError("ID котика должен быть положительным числом.") return value
Представление для создания заказа:
# orders/views.py import uuid import logging from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from .serializers import OrderSerializer from catshop.kafka_producer import get_kafka_producer logger = logging.getLogger(__name__) # Инициализация Kafka Producer один раз при старте приложения producer = get_kafka_producer() class OrderCreateView(APIView): """ API для создания нового заказа. """ def post(self, request, *args, **kwargs): serializer = OrderSerializer(data=request.data) if serializer.is_valid(): order_data = serializer.validated_data # Генерация уникального идентификатора заказа order_data["order_id"] = str(uuid.uuid4()) order_data["status"] = "CREATED" try: future = producer.send('orders_topic', order_data) result = future.get(timeout=10) logger.info(f"Сообщение отправлено: топик {result.topic}, партиция {result.partition}") except Exception as e: logger.error(f"Ошибка отправки события в Kafka: {e}") return Response( {"error": "Не удалось обработать заказ, попробуйте позже."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) return Response(order_data, status=status.HTTP_201_CREATED) else: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
Настроим маршруты:
# orders/urls.py from django.urls import path from .views import OrderCreateView urlpatterns = [ path('orders/', OrderCreateView.as_view(), name='order-create'), ]
И подключаем маршруты в основном файле проекта:
# catshop/urls.py from django.contrib import admin from django.urls import path, include urlpatterns = [ path('admin/', admin.site.urls), path('api/', include('orders.urls')), ]
Kafka Consumer – кто слушает события?
Чтобы система работала целиком, нужен компонент, который будет принимать сообщения из Kafka и обрабатывать их. Для этого реализуем Kafka Consumer с ручным подтверждением сообщений.
# kafka_consumer.py import json import logging from kafka import KafkaConsumer from typing import Any, Dict logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) def get_kafka_consumer(topic: str, group_id: str = "order_consumers") -> KafkaConsumer: """ Инициализация KafkaConsumer с ручным подтверждением. """ try: consumer = KafkaConsumer( topic, bootstrap_servers=['localhost:9092'], group_id=group_id, value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='earliest', enable_auto_commit=False # Ручное подтверждение для большей надежности ) logger.info("KafkaConsumer успешно инициализирован!") return consumer except Exception as e: logger.error(f"Ошибка инициализации KafkaConsumer: {e}") raise def process_order_event(event: Dict[str, Any]) -> None: """ Логика обработки события заказа. Здесь можно добавить вызовы платежной системы, обновление базы данных и прочие действия. """ try: order_id = event.get("order_id") status_event = event.get("status") logger.info(f"Обрабатываю заказ {order_id} со статусом {status_event}") if status_event == "CREATED": logger.info(f"Заказ {order_id} подтвержден – начинаем обработку платежа.") # Можно добавить вызов функции подтверждения заказа или уведомления другого сервиса elif status_event == "CANCELLED": logger.info(f"Заказ {order_id} отменён – запускаем компенсационные транзакции.") # Здесь реализуйте логику отмены и компенсационных транзакций else: logger.warning(f"Заказ {order_id} имеет неизвестный статус: {status_event}") except Exception as e: logger.error(f"Ошибка при обработке заказа {event.get('order_id')}: {e}") if __name__ == "__main__": consumer = get_kafka_consumer("orders_topic") try: for message in consumer: logger.info(f"Получено сообщение: {message.value}") process_order_event(message.value) consumer.commit() # Ручное подтверждение после успешной обработки except KeyboardInterrupt: logger.info("Остановка KafkaConsumer...") finally: consumer.close()
Вручную подтверждаем обработку каждого сообщения, чтобы не потерять ни один заказ.
Saga Pattern
В распределённых системах не избежать ошибок. Если, например, платеж не проходит, мы должны отменить заказ и выполнить необходимые компенсационные транзакции. Вот тут на помощь приходит Saga Pattern.
# saga_handler.py import logging from typing import Dict, Any logger = logging.getLogger(__name__) def handle_saga(event: Dict[str, Any]) -> None: """ Простейшая реализация Saga: если платеж не успешен, запускаем компенсационные транзакции. """ order_id = event.get("order_id") payment_status = event.get("payment_status", "FAILED") # По умолчанию считаем, что платеж не прошёл if payment_status != "SUCCESS": logger.info(f"Платеж для заказа {order_id} не прошёл – запускаем компенсационные транзакции.") cancel_order(order_id) else: logger.info(f"Платеж для заказа {order_id} успешен – заказ подтвержден.") def cancel_order(order_id: str) -> None: """ Функция компенсации: отменяет заказ и выполняет необходимые действия (например, возврат средств). """ try: logger.info(f"Заказ {order_id} отменён. Выполняются компенсационные транзакции...") # Здесь можно вызвать другой микросервис или обновить статус заказа в БД except Exception as e: logger.error(f"Ошибка при отмене заказа {order_id}: {e}")
Так сейвим целостность данных даже в случае сбоев.
Тестирование
Пример тестов для API заказа, написанных с использованием DRF.
Тесты для API заказа:
# orders/tests.py from django.urls import reverse from rest_framework.test import APITestCase class OrderAPITestCase(APITestCase): def test_create_order_success(self): url = reverse('order-create') data = { "cat_id": 10, "quantity": 2, "customer_name": "Мурка" } response = self.client.post(url, data, format='json') self.assertEqual(response.status_code, 201) self.assertIn("order_id", response.data) self.assertEqual(response.data.get("status"), "CREATED") def test_create_order_invalid_data(self): url = reverse('order-create') data = { "cat_id": -1, # Недопустимое значение "quantity": 0, "customer_name": "" } response = self.client.post(url, data, format='json') self.assertEqual(response.status_code, 400)
Тестируйте позитивные сценарии, а также ситуации с ошибками, таймаутами и сбоями в сети.
Был ли у вас опыт создания подобных систем? Делитесь своим опытом и идеями в комментариях.
20 февраля пройдёт открытый урок на тему «Практика аутентификации и авторизации в микросервисной архитектуре».
Разберём ключевые протоколы и стандарты, такие как OAuth2 и JWT, а также изучим практические прикладные инструменты для централизованного управления доступом. Если интересно, записыватесь на странице курса «Microservice Architecture».
ссылка на оригинал статьи https://habr.com/ru/articles/883996/
Добавить комментарий