Python, MSA, Kafka

от автора

Всем привет! Сегодня микросервисная архитектура, что называется «на хайпе». Я перечитал достаточно много статей по данной тематике, но обнаружил, что среди всего прочего, не так много публикаций, объясняющих данную концепцию на конкретном примере (может, плохо искал). Сегодня я бы хотел пополнить ряды авторов и написать свою первую публикацию, не судите строго!

Оглавление

  1. Для кого эта статья?

  2. Краткое описание

  3. Структура

  4. FASTApi сервис (web)

  5. Currency сервис (Scrapy)

  6. Notify сервис (бот)

  7. Инфраструктура

  8. Заключение

Для кого эта статья?

Я надеюсь, что эта статья станет отличным и понятным примером для разработчика, только начинающего свой путь в микросервисах. Многие аспекты данной концепции требуют сбора большого количества информации из разных источников. При этом попутно возникающие вопросы часто сбивают с толку и отвлекают от первой конечной цели — «пощупать» своими руками взаимодействие нескольких независимых приложений, настроить между ними работу. Когда некое ядро понимания происходящего будет сформировано, полагаю, вы вернетесь к более узким и специфическим вопросам в этой теме, и понять их вам станет легче.

Краткое описание

Вашему вниманию представлена система, состоящая из 3-ех микросервисов:

  • Приложение на FASTApi;

  • Веб-парсер на Scrapy;

  • Телеграмм бот на Aiogram;

Что эта «система» собирается делать? Все достаточно просто — мы создадим end-point, который будет принимать в себя два параметра — телеграм ID и код валюты. Далее эти данные отправятся в парсер (Scrapy), который по полученному коду валюты, узнает ее курс в рублях. Напоследок, собранную информацию примет бот и отправит пользователю. Чтобы стало еще понятнее, предлагаю посмотреть на рис. 1:

Рис. 1. Схема работы микросервисов

Рис. 1. Схема работы микросервисов

В качестве языка программирования будем использовать Python, также нам понадобится Redis, Kafka и Zookeper. Чтобы «поднять» все это зло добро, прибегнем к Docker-у.

Структура

Для того, чтобы в дальнейшем читатель не «забуксовал» в попытке понять расположение того или иного файла или куска кода, считаю нужным пояснить как будет располагаться структура файлов и папок:

MSA/

docker‑compose.yml;

.env;

services/ (в ней будут находиться все наши микросервисы);

currency/ (сервис на Scrapy, ответственный за парсинг валют);

web/ (сервис на FASTApi, ответственный за получение входных данных);

notify/ (сервис с ботом, ответственный за отправку сообщения пользователю);

Конечно, у каждого сервиса должен быть свой .env файл. Однако здесь мы умышленно согрешим упростим и создадим один общий для всех. Также на первой строчке каждого блока кода, в качестве комментария будет располагаться путь к этому файлу.

Hidden text

Весь код для изучения/клонирования можно найти в этом репозитории. Однако я настоятельно рекомендую прописать все своими руками.

FASTApi сервис (web)

Начнем с самого главного — точки входа в нашу систему. Установим все необходимые зависимости, их, кстати, будем хранить в poetry. В web сервисе инициализируем окружение.

cd services/web && poetry init

И сразу установим зависимости

poetry add fastapi uvicorn aiokafka

Как уже говорилось раннее — мы будем использовать Apache Kafka в качестве брокера сообщений. В Python есть асинхронный пакет — aiokafka — им мы и воспользуемся.

Чтобы избежать «кучи-малы» и не мешать .py файлы с другими — внутри директории web создадим еще одну директорию app — и в ней уже будем располагать весь код.

Начнем с создания схем, здесь ничего сложного, мы ожидаем всего два параметра:

# services/web/app/schemas.py from pydantic import BaseModel   class Message(BaseModel):     currency_char_code: str     telegram_id: int

Опишем нужный нам end-point:

# services/web/app/main.py from fastapi import FastAPI from web.schemas import Message   app = FastAPI()   @app.post("/currency-info") async def send(message: Message):     return message.model_dump()

Пока что будем просто возвращать то, что передал пользователь, а логику с брокером добавим позже. Далее укажем необходимые настройки и переменные, которые в будущем будем использовать для взаимодействия с брокером:

# services/web/app/settings.py import os  KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")  PRODUCE_TOPIC = os.getenv("WEB_TOPIC")
# .env KAFKA_BOOTSTRAP_SERVERS=kafka:9092  WEB_TOPIC=web

WEB_TOPIC — топик, куда наш web-сервис будет отправлять сообщения. В будущем Scrapy сервис будет этот топик слушать.

KAFKA_BOOTSTRAP_SERVERS — адрес сервера брокера (их может быть несколько, в нашем случае всего один).

Теперь перейдем к написанию класса-оболочки над нашим «продьюсером»:

# services/web/app/producer.py from aiokafka import AIOKafkaProducer from web.app import settings import asyncio  event_loop = asyncio.get_event_loop()   class AIOWebProducer(object):     def __init__(self):         self.__producer = AIOKafkaProducer(             bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,             loop=event_loop,         )         self.__produce_topic = settings.PRODUCE_TOPIC      async def start(self) -> None:         await self.__producer.start()      async def stop(self) -> None:         await self.__producer.stop()      async def send(self, value: bytes) -> None:         await self.start()         try:             await self.__producer.send(                 topic=self.__produce_topic,                 value=value,             )         finally:             await self.stop()   def get_producer() -> AIOWebProducer:     return AIOWebProducer()

Кто такой «продьюсер»? Говоря простым языком — это специальный класс, который может взаимодействовать с брокером сообщений и предоставляет интерфейс отправки сообщения по нужному топику.

Мы воспользовались классом AIOKafkaProducer из модуля aiokafka. Он представляет из себя интерфейс взаимодействия с брокером от лица производителя (продьюсера) сообщений. В качестве параметров мы указали текущий цикл событий и KAFKA_BOOTSTRAP_SERVERS.

В нашем классе-обертке ничего сложного нет. Методы start и stop просто оболочки над одноименными методами класса AIOKafkaProducer. start — открывает соединение с кластером Kafka, stop — очищает все ожидающие данные и закрывает соединение.

Метод send — такая же надстройка, только она инкапсулирует создание и закрытие соединения перед отправкой сообщения по нужному топику. Также стоит отметить, что отправлять данные мы будем в виде байтов. Это следует из документации пакета и устройства самой Kafka.

Функция get_producer нужна для удобной инъекции продюсера в наш единственный end-point. Давайте как раз этим и займемся:

# web/app/main.py from __future__ import annotations from typing import TYPE_CHECKING from fastapi import FastAPI, Depends from web.app.schemas import Message from web.app.producer import get_producer import json  if TYPE_CHECKING:     from web.app.producer import AIOWebProducer   app = FastAPI()   @app.post("/currency-info") async def send(message: Message, producer: AIOWebProducer = Depends(get_producer)) -> None:     message_to_produce = json.dumps(message.model_dump()).encode(encoding="utf-8")     await producer.send(value=message_to_produce)

В целом, практически ничего не изменилось, кода действительно немного (2 строчки). Сначала мы сериализуем объект сообщения (переводим его в байты), после чего отправляем полученный набор байтов в назначенный топик при помощи метода send. Все.

Последнее, что нам осталось сделать для этого микросервиса — написать для него Dockerfile:

# services/web/Dockerfile  FROM python:3.11-slim-buster  WORKDIR /usr/src/web  ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 ENV PYTHONPATH=/usr/src  COPY poetry.lock pyproject.toml ./  RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root  COPY app app/  WORKDIR ./app

Currency сервис (Scrapy)

Как и до этого инициализируем окружение и добавим необходимые зависимости:

cd services/currency && poetry init poetry add scrapy redis aiokafka

В этот раз мы не будем создавать папку app, так как Scrapy сделает это за нас и даже лучше. Вместо этого инициализируем проект при помощи команды:

scrapy startproject currency

Давайте избавимся от лишней вложенности папок currency, немного раскроем матрешку и приведем ее к такому виду:

Рис. 2. Структура файлов Scrapy сервиса

Рис. 2. Структура файлов Scrapy сервиса

Теперь поговорим о том, как будет парситься информация и куда она будет попадать. Итак, в Scrapy есть «пауки», суть которых как раз парсить данные с чего-либо. После того как данные будут получены, мы положим их в Redis, чтобы на каждый запрос пользователя не ходить в интернет. Сгенеририруем базовую комплектацию «паука», для этого введем:

scrapy genspider currency_v1 https://www.cbr.ru/scripts/xml_daily.asp

Далее в папке spiders должен был появиться каркас нашего будущего «паука». Давайте немного подредактируем его код:

# services/currency/currency/spiders/currency_v1.py import scrapy   class CurrencyV1Spider(scrapy.Spider):     name = "currency_v1"     allowed_domains = ["www.cbr.ru"]     start_urls = ["https://www.cbr.ru/scripts/xml_daily.asp"]      def parse(self, response, **kwargs) -> None:         currencies = response.xpath("//ValCurs//Valute")         for currency in currencies:             print(currency)

Итак, чтобы убедиться, что «паук» действительно что-либо парсит, предлагаю запустить его и проверить, для этого пишем в консоль:

scrapy crawl currency_v1 

Если в терминале посыпалось кучу различной информации вида <Selector query='//ValCurs//Valute'... — отлично, давайте подключим Redis, чтобы эта информация где-то хранилась. Для этого, во-первых, обновим .env:

KAFKA_BOOTSTRAP_SERVERS=kafka:9092  WEB_TOPIC=web  REDIS_HOST=redis  REDIS_PORT=6379

А, во-вторых, добавим необходимые настройки, чтобы наш сервис подключался к Redis:

# services/currency/currency/settings.py import os  BOT_NAME = "currency" SPIDER_MODULES = ["currency.spiders"] NEWSPIDER_MODULE = "currency.spiders" ROBOTSTXT_OBEY = True REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7" TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor" FEED_EXPORT_ENCODING = "utf-8"  # --- REDIS SETTINGS  CURRENCY_REDIS_CACHE_TIME_IN_SECONDS = 60  REDIS_HOST = os.getenv("REDIS_HOST", "redis") REDIS_PORT = os.getenv("REDIS_PORT", "6379") REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}"

Я удалил абсолютно весь закомментированный автосгенерированный фреймворком код и добавил настройки для нашего NoSQL хранилища. Теперь в методе parse нашего паука будем сохранять поступившую информацию в Redis:

# services/currency/currency/spiders/currency_v1.py from currency.currency import settings import scrapy import redis import json   class CurrencyV1Spider(scrapy.Spider):     name = "currency_v1"     allowed_domains = ["www.cbr.ru"]     start_urls = ["https://www.cbr.ru/scripts/xml_daily.asp"]      def parse(self, response, **kwargs) -> None:         currencies = response.xpath("//ValCurs//Valute")         with redis.from_url(url=settings.REDIS_URL) as redis_client:             for currency in currencies:                 redis_client.set(                     name=self.get_currency_redis_key(currency),                     value=self.get_currency_redis_value(currency),                     ex=settings.CURRENCY_REDIS_CACHE_TIME_IN_SECONDS,                 )      @staticmethod     def get_currency_redis_key(selector) -> str:         return selector.xpath(".//CharCode//text()").get()      @staticmethod     def get_currency_redis_value(selector) -> bytes:         return json.dumps(             {                 "currency_value": selector.xpath(".//Value//text()").get()             }         ).encode("utf-8")

Теперь давайте создадим какой-нибудь класс-контроллер, с помощью которого мы сможем получить курс валюты по ее коду:

# services/currency/currency/controller.py from multiprocessing import Process, Queue from scrapy.crawler import CrawlerProcess from currency.currency import settings from currency.currency.spiders.currency_v1 import CurrencyV1Spider from redis import asyncio as aioredis import json   class CurrencyController(object):      def update_redis_cache(self) -> None:         queue = Queue()         process = Process(target=self._crawl_currency, args=(queue,))         process.start()         none_or_exception = queue.get()         process.join()         if none_or_exception is not None:             raise none_or_exception      @staticmethod     def _crawl_currency(queue: Queue) -> None:         try:             process = CrawlerProcess()             process.crawl(CurrencyV1Spider)             process.start()             queue.put(None)         except Exception as exc:             queue.put(exc)      async def get_currency_info(self, currency_char_code: str) -> dict:         if currency_info := await self.get_currency_from_redis(currency_char_code):             return currency_info         self.update_redis_cache()         return await self.get_currency_from_redis(currency_char_code)          @staticmethod     async def get_currency_from_redis(char_code: str) -> dict | None:         async with aioredis.from_url(settings.REDIS_URL) as redis_client:             if currency_info := await redis_client.get(name=char_code):                 return json.loads(currency_info)

Можно не обращать внимание на танцы с бубном вокруг multiprocessing, просто Scrapy не позволяет запустить паук в том же самом процессе, поэтому для каждого запуска мы создаем новый. Возможно (и скорее всего) это не самое лучшее решение, однако я допускаю его в рамках данной статьи для упрощения. Итак, возвращаемся к брокеру.

В этом сервисе нам необходим как производитель, так и потребитель. Потребитель будет слушать на входящие сообщения от web сервиса, производитель отправлять notify сервису. Добавим необходимые настройки:

# services/currency/currency/settings.py # --- SCRAPY SETTINGS ...  # --- REDIS SETTINGS ...  # --- KAFKA SETTINGS  PRODUCE_TOPIC = os.getenv("CURRENCY_TOPIC") CONSUME_TOPIC = os.getenv("WEB_TOPIC") KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")

Не забываем про .env:

KAFKA_BOOTSTRAP_SERVERS=kafka:9092  WEB_TOPIC=web  REDIS_HOST=redis  REDIS_PORT=6379  CURRENCY_TOPIC=currency

И пишем код в main.py для прослушивания и отправки сообщений посредством брокера:

# services/currency/currency/main.py  import json import asyncio  from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from currency.currency import settings from currency.currency.controller import CurrencyController   async def start_consumer() -> AIOKafkaConsumer:     consumer = AIOKafkaConsumer(         settings.CONSUME_TOPIC,         bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,     )     await consumer.start()     return consumer   async def start_producer() -> AIOKafkaProducer:     producer = AIOKafkaProducer(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)     await producer.start()     return producer   async def main() -> None:     controller = CurrencyController()     consumer = await start_consumer()     producer = await start_producer()     try:         async for message in consumer:             decoded_message = json.loads(message.value)             currency_info = await controller.get_currency_info(                 currency_char_code=decoded_message.get("currency_char_code"),             ) or dict()             currency_info["telegram_id"] = decoded_message["telegram_id"]             info_to_send = json.dumps(currency_info).encode(encoding="utf-8")             await producer.send(topic=settings.PRODUCE_TOPIC, value=info_to_send)     finally:         await consumer.stop()         await producer.stop()   if __name__ == '__main__':     asyncio.run(main()) 

Здесь мы инициализируем производителя и потребителя, слушаем на входящие сообщения, как только оно появляется — парсим информацию о валюте и отправляем уже телеграмм боту. Еще раз: слушаем WEB_TOPIC, а производим в CURRENCY_TOPIC .

Осталось написать Dockerfile:

# services/currency/Dockerfile FROM python:3.11-slim-buster  WORKDIR /usr/src/currency/  ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 ENV PYTHONPATH=/usr/src  COPY poetry.lock pyproject.toml ./  RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root  COPY currency currency/  WORKDIR ./currency

Notify сервис (бот)

Считаю нецелесообразно рассказывать в этой статье, как создавать своего телеграмм-бота. В интернете масса информации по этой теме. Начинаем мы с того, что у вас есть токен и вы знаете зачем он нужен.

Как и в предыдущем сервисе инициализируем poetry:

cd services/notify && poetry init

И добавим необходимые зависимости:

poetry add aiogram aiokafka

Создадим внутри директории notify новую директорию app, а в ней settings.py и добавим туда следующий код:

import os  BOT_TOKEN = os.getenv("BOT_TOKEN")  CONSUME_TOPIC = os.getenv("CURRENCY_TOPIC")  KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")

Из интересного здесь то, что в качестве прослушиваемого топика, мы используем CURRENCY_TOPIC, куда отправляет сообщение наш микро-сервис на Scrapy. Также не забудьте в .env файл добавить переменную BOT_TOKEN и указать в ней токен вашего бота. Файл с переменными окружениями по итогу должен выглядеть примерно так:

KAFKA_BOOTSTRAP_SERVERS=kafka:9092  WEB_TOPIC=web  REDIS_HOST=redis  REDIS_PORT=6379  CURRENCY_TOPIC=currency  BOT_TOKEN=#токен

Далее рядом создадим main.py, в нем будет запуск бота и прослушивание входящих сообщений:

from aiogram import Bot from aiogram import Dispatcher from aiokafka import AIOKafkaConsumer from notify.app import settings  import asyncio import json  dispatcher = Dispatcher() BOT = Bot(token=settings.BOT_TOKEN)   async def consume() -> None:     consumer = AIOKafkaConsumer(         settings.CONSUME_TOPIC,         bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,     )     await consumer.start()     try:         async for msg in consumer:             serialized = json.loads(msg.value)             await BOT.send_message(                 chat_id=serialized.get("telegram_id"),                 text=serialized.get("currency_value") or "Валюта не найдена",             )     finally:         await consumer.stop()   async def main() -> None:     polling = asyncio.create_task(dispatcher.start_polling(BOT))     consuming = asyncio.create_task(consume())     await asyncio.gather(polling, consuming)     print("Bot has successfully started polling")   if __name__ == "__main__":     asyncio.run(main())

Будем отделять мух от котлет и обращать внимание на код, связанный с потреблением сообщений.

  1. Мы воспользовались классом AIOKafkaConsumer, куда передали топик, который мы слушаем, и BOOTSTRAP_SERVERS.

  2. Асинхронная функция consume состоит глобально из трех частей: устанавливаем соединение с брокером — await consumer.start(), слушаем на входящие сообщения — async for msg in consumer. Если сообщение есть — десериализуем его и отправляем пользователю в его телеграмм при помощи метода send_message.

Как вы поняли, здесь отсутствует «продьюсер», так как это конечная точка нашей системы, откуда сообщение уже отправится пользователю в телеграмм бот.

Также напишем боту Dockerfile, он почти ничем не отличается от предыдущего:

# services/notify/Dockerfile FROM python:3.11-slim-buster  WORKDIR /usr/src/notify  ENV PYTHONDONTWRITEBYTECODE 1  ENV PYTHONUNBUFFERED 1  ENV PYTHONPATH=/usr/src  COPY poetry.lock pyproject.toml ./  RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root  COPY app app/  WORKDIR ./app

Инфраструктура

Для того, чтобы развернуть все наши шестеренки, опишем docker-compose.yml:

Hidden text
version: '3.3'  services:   web:     restart: on-failure     build:       context: services/web/       dockerfile: Dockerfile     container_name: "web"     ports:       - "8000:8000"     environment:       KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS       WEB_TOPIC: $WEB_TOPIC     command: uvicorn main:app --reload --host 0.0.0.0 --port 8000    currency:     restart: on-failure     build:       context: services/currency/       dockerfile: Dockerfile     container_name: "currency"     environment:       KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS       WEB_TOPIC: $WEB_TOPIC       CURRENCY_TOPIC: $CURRENCY_TOPIC       REDIS_HOST: $REDIS_HOST       REDIS_PORT: $REDIS_PORT     command: python main.py    bot:     restart: on-failure     build:       context: services/notify/       dockerfile: Dockerfile     container_name: "notify"     environment:       KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS       CURRENCY_TOPIC: $CURRENCY_TOPIC       BOT_TOKEN: $BOT_TOKEN     command: python main.py    zookeeper:     image: 'bitnami/zookeeper:3.7.0'     container_name: zookeeper     ports:       - '2181:2181'     environment:       - ALLOW_ANONYMOUS_LOGIN=yes      kafka:     image: 'bitnami/kafka:2.8.0'     container_name: kafka     ports:       - "9093:9093"     environment:       - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181       - ALLOW_PLAINTEXT_LISTENER=yes     depends_on:       - zookeeper    redis:     image: redis     container_name: redis     environment:       REDIS_HOST: REDIS_HOST     ports:       - "6379:6379" 

Момент истины — поднимаем:

docker compose up -d --build

Заранее следует проверить, что ничего не «отвалилось»:

docker ps

В выводе должно быть шесть рабочих контейнеров. Если это так — убедитесь, что вы начали диалог с ботом и, если оно тоже так, смело переходим по адресу и отправляем POST-запрос нашему единственному енд-поинту, куда передаем свой телеграмм ID и, допустим, USD. Ответом на это действие должно быть сообщение в телеграмме от нашего бота с одиноким и неприятным числом.

Заключение

Напоследок, хотелось бы сказать, что в данной статье намеренно пропущены некоторые моменты с настройкой Kafka, с ее переменными, которые можно указать в docker-compose.yml. Я не хотел перегружать статью информацией, так как, полагаю, что для человека, который будет работать с этим инструментом впервые, важнее получить практический опыт.

Я благодарен, если вы дочитали статью до этого момента. Надеюсь, я смог в понятной форме рассказать и, что главное, показать, как работает брокер на поверхностном уровне, не залезая в дебри. До новых встреч!


ссылка на оригинал статьи https://habr.com/ru/articles/793936/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *