Всем привет! Сегодня микросервисная архитектура, что называется «на хайпе». Я перечитал достаточно много статей по данной тематике, но обнаружил, что среди всего прочего, не так много публикаций, объясняющих данную концепцию на конкретном примере (может, плохо искал). Сегодня я бы хотел пополнить ряды авторов и написать свою первую публикацию, не судите строго!
Оглавление
Для кого эта статья?
Я надеюсь, что эта статья станет отличным и понятным примером для разработчика, только начинающего свой путь в микросервисах. Многие аспекты данной концепции требуют сбора большого количества информации из разных источников. При этом попутно возникающие вопросы часто сбивают с толку и отвлекают от первой конечной цели — «пощупать» своими руками взаимодействие нескольких независимых приложений, настроить между ними работу. Когда некое ядро понимания происходящего будет сформировано, полагаю, вы вернетесь к более узким и специфическим вопросам в этой теме, и понять их вам станет легче.
Краткое описание
Вашему вниманию представлена система, состоящая из 3-ех микросервисов:
-
Приложение на FASTApi;
-
Веб-парсер на Scrapy;
-
Телеграмм бот на Aiogram;
Что эта «система» собирается делать? Все достаточно просто — мы создадим end-point, который будет принимать в себя два параметра — телеграм ID и код валюты. Далее эти данные отправятся в парсер (Scrapy), который по полученному коду валюты, узнает ее курс в рублях. Напоследок, собранную информацию примет бот и отправит пользователю. Чтобы стало еще понятнее, предлагаю посмотреть на рис. 1:
В качестве языка программирования будем использовать Python, также нам понадобится Redis, Kafka и Zookeper. Чтобы «поднять» все это зло добро, прибегнем к Docker-у.
Структура
Для того, чтобы в дальнейшем читатель не «забуксовал» в попытке понять расположение того или иного файла или куска кода, считаю нужным пояснить как будет располагаться структура файлов и папок:
MSA/
docker‑compose.yml;
.env;
services/ (в ней будут находиться все наши микросервисы);
currency/ (сервис на Scrapy, ответственный за парсинг валют);
web/ (сервис на FASTApi, ответственный за получение входных данных);
notify/ (сервис с ботом, ответственный за отправку сообщения пользователю);
Конечно, у каждого сервиса должен быть свой .env файл. Однако здесь мы умышленно согрешим упростим и создадим один общий для всех. Также на первой строчке каждого блока кода, в качестве комментария будет располагаться путь к этому файлу.
Hidden text
Весь код для изучения/клонирования можно найти в этом репозитории. Однако я настоятельно рекомендую прописать все своими руками.
FASTApi сервис (web)
Начнем с самого главного — точки входа в нашу систему. Установим все необходимые зависимости, их, кстати, будем хранить в poetry. В web сервисе инициализируем окружение.
cd services/web && poetry init
И сразу установим зависимости
poetry add fastapi uvicorn aiokafka
Как уже говорилось раннее — мы будем использовать Apache Kafka в качестве брокера сообщений. В Python есть асинхронный пакет — aiokafka — им мы и воспользуемся.
Чтобы избежать «кучи-малы» и не мешать .py файлы с другими — внутри директории web создадим еще одну директорию app — и в ней уже будем располагать весь код.
Начнем с создания схем, здесь ничего сложного, мы ожидаем всего два параметра:
# services/web/app/schemas.py from pydantic import BaseModel class Message(BaseModel): currency_char_code: str telegram_id: int
Опишем нужный нам end-point:
# services/web/app/main.py from fastapi import FastAPI from web.schemas import Message app = FastAPI() @app.post("/currency-info") async def send(message: Message): return message.model_dump()
Пока что будем просто возвращать то, что передал пользователь, а логику с брокером добавим позже. Далее укажем необходимые настройки и переменные, которые в будущем будем использовать для взаимодействия с брокером:
# services/web/app/settings.py import os KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS") PRODUCE_TOPIC = os.getenv("WEB_TOPIC")
# .env KAFKA_BOOTSTRAP_SERVERS=kafka:9092 WEB_TOPIC=web
WEB_TOPIC — топик, куда наш web-сервис будет отправлять сообщения. В будущем Scrapy сервис будет этот топик слушать.
KAFKA_BOOTSTRAP_SERVERS — адрес сервера брокера (их может быть несколько, в нашем случае всего один).
Теперь перейдем к написанию класса-оболочки над нашим «продьюсером»:
# services/web/app/producer.py from aiokafka import AIOKafkaProducer from web.app import settings import asyncio event_loop = asyncio.get_event_loop() class AIOWebProducer(object): def __init__(self): self.__producer = AIOKafkaProducer( bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, loop=event_loop, ) self.__produce_topic = settings.PRODUCE_TOPIC async def start(self) -> None: await self.__producer.start() async def stop(self) -> None: await self.__producer.stop() async def send(self, value: bytes) -> None: await self.start() try: await self.__producer.send( topic=self.__produce_topic, value=value, ) finally: await self.stop() def get_producer() -> AIOWebProducer: return AIOWebProducer()
Кто такой «продьюсер»? Говоря простым языком — это специальный класс, который может взаимодействовать с брокером сообщений и предоставляет интерфейс отправки сообщения по нужному топику.
Мы воспользовались классом AIOKafkaProducer из модуля aiokafka. Он представляет из себя интерфейс взаимодействия с брокером от лица производителя (продьюсера) сообщений. В качестве параметров мы указали текущий цикл событий и KAFKA_BOOTSTRAP_SERVERS.
В нашем классе-обертке ничего сложного нет. Методы start и stop просто оболочки над одноименными методами класса AIOKafkaProducer. start — открывает соединение с кластером Kafka, stop — очищает все ожидающие данные и закрывает соединение.
Метод send — такая же надстройка, только она инкапсулирует создание и закрытие соединения перед отправкой сообщения по нужному топику. Также стоит отметить, что отправлять данные мы будем в виде байтов. Это следует из документации пакета и устройства самой Kafka.
Функция get_producer нужна для удобной инъекции продюсера в наш единственный end-point. Давайте как раз этим и займемся:
# web/app/main.py from __future__ import annotations from typing import TYPE_CHECKING from fastapi import FastAPI, Depends from web.app.schemas import Message from web.app.producer import get_producer import json if TYPE_CHECKING: from web.app.producer import AIOWebProducer app = FastAPI() @app.post("/currency-info") async def send(message: Message, producer: AIOWebProducer = Depends(get_producer)) -> None: message_to_produce = json.dumps(message.model_dump()).encode(encoding="utf-8") await producer.send(value=message_to_produce)
В целом, практически ничего не изменилось, кода действительно немного (2 строчки). Сначала мы сериализуем объект сообщения (переводим его в байты), после чего отправляем полученный набор байтов в назначенный топик при помощи метода send. Все.
Последнее, что нам осталось сделать для этого микросервиса — написать для него Dockerfile:
# services/web/Dockerfile FROM python:3.11-slim-buster WORKDIR /usr/src/web ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 ENV PYTHONPATH=/usr/src COPY poetry.lock pyproject.toml ./ RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root COPY app app/ WORKDIR ./app
Currency сервис (Scrapy)
Как и до этого инициализируем окружение и добавим необходимые зависимости:
cd services/currency && poetry init poetry add scrapy redis aiokafka
В этот раз мы не будем создавать папку app, так как Scrapy сделает это за нас и даже лучше. Вместо этого инициализируем проект при помощи команды:
scrapy startproject currency
Давайте избавимся от лишней вложенности папок currency, немного раскроем матрешку и приведем ее к такому виду:
Теперь поговорим о том, как будет парситься информация и куда она будет попадать. Итак, в Scrapy есть «пауки», суть которых как раз парсить данные с чего-либо. После того как данные будут получены, мы положим их в Redis, чтобы на каждый запрос пользователя не ходить в интернет. Сгенеририруем базовую комплектацию «паука», для этого введем:
scrapy genspider currency_v1 https://www.cbr.ru/scripts/xml_daily.asp
Далее в папке spiders должен был появиться каркас нашего будущего «паука». Давайте немного подредактируем его код:
# services/currency/currency/spiders/currency_v1.py import scrapy class CurrencyV1Spider(scrapy.Spider): name = "currency_v1" allowed_domains = ["www.cbr.ru"] start_urls = ["https://www.cbr.ru/scripts/xml_daily.asp"] def parse(self, response, **kwargs) -> None: currencies = response.xpath("//ValCurs//Valute") for currency in currencies: print(currency)
Итак, чтобы убедиться, что «паук» действительно что-либо парсит, предлагаю запустить его и проверить, для этого пишем в консоль:
scrapy crawl currency_v1
Если в терминале посыпалось кучу различной информации вида <Selector query='//ValCurs//Valute'... — отлично, давайте подключим Redis, чтобы эта информация где-то хранилась. Для этого, во-первых, обновим .env:
KAFKA_BOOTSTRAP_SERVERS=kafka:9092 WEB_TOPIC=web REDIS_HOST=redis REDIS_PORT=6379
А, во-вторых, добавим необходимые настройки, чтобы наш сервис подключался к Redis:
# services/currency/currency/settings.py import os BOT_NAME = "currency" SPIDER_MODULES = ["currency.spiders"] NEWSPIDER_MODULE = "currency.spiders" ROBOTSTXT_OBEY = True REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7" TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor" FEED_EXPORT_ENCODING = "utf-8" # --- REDIS SETTINGS CURRENCY_REDIS_CACHE_TIME_IN_SECONDS = 60 REDIS_HOST = os.getenv("REDIS_HOST", "redis") REDIS_PORT = os.getenv("REDIS_PORT", "6379") REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}"
Я удалил абсолютно весь закомментированный автосгенерированный фреймворком код и добавил настройки для нашего NoSQL хранилища. Теперь в методе parse нашего паука будем сохранять поступившую информацию в Redis:
# services/currency/currency/spiders/currency_v1.py from currency.currency import settings import scrapy import redis import json class CurrencyV1Spider(scrapy.Spider): name = "currency_v1" allowed_domains = ["www.cbr.ru"] start_urls = ["https://www.cbr.ru/scripts/xml_daily.asp"] def parse(self, response, **kwargs) -> None: currencies = response.xpath("//ValCurs//Valute") with redis.from_url(url=settings.REDIS_URL) as redis_client: for currency in currencies: redis_client.set( name=self.get_currency_redis_key(currency), value=self.get_currency_redis_value(currency), ex=settings.CURRENCY_REDIS_CACHE_TIME_IN_SECONDS, ) @staticmethod def get_currency_redis_key(selector) -> str: return selector.xpath(".//CharCode//text()").get() @staticmethod def get_currency_redis_value(selector) -> bytes: return json.dumps( { "currency_value": selector.xpath(".//Value//text()").get() } ).encode("utf-8")
Теперь давайте создадим какой-нибудь класс-контроллер, с помощью которого мы сможем получить курс валюты по ее коду:
# services/currency/currency/controller.py from multiprocessing import Process, Queue from scrapy.crawler import CrawlerProcess from currency.currency import settings from currency.currency.spiders.currency_v1 import CurrencyV1Spider from redis import asyncio as aioredis import json class CurrencyController(object): def update_redis_cache(self) -> None: queue = Queue() process = Process(target=self._crawl_currency, args=(queue,)) process.start() none_or_exception = queue.get() process.join() if none_or_exception is not None: raise none_or_exception @staticmethod def _crawl_currency(queue: Queue) -> None: try: process = CrawlerProcess() process.crawl(CurrencyV1Spider) process.start() queue.put(None) except Exception as exc: queue.put(exc) async def get_currency_info(self, currency_char_code: str) -> dict: if currency_info := await self.get_currency_from_redis(currency_char_code): return currency_info self.update_redis_cache() return await self.get_currency_from_redis(currency_char_code) @staticmethod async def get_currency_from_redis(char_code: str) -> dict | None: async with aioredis.from_url(settings.REDIS_URL) as redis_client: if currency_info := await redis_client.get(name=char_code): return json.loads(currency_info)
Можно не обращать внимание на танцы с бубном вокруг multiprocessing, просто Scrapy не позволяет запустить паук в том же самом процессе, поэтому для каждого запуска мы создаем новый. Возможно (и скорее всего) это не самое лучшее решение, однако я допускаю его в рамках данной статьи для упрощения. Итак, возвращаемся к брокеру.
В этом сервисе нам необходим как производитель, так и потребитель. Потребитель будет слушать на входящие сообщения от web сервиса, производитель отправлять notify сервису. Добавим необходимые настройки:
# services/currency/currency/settings.py # --- SCRAPY SETTINGS ... # --- REDIS SETTINGS ... # --- KAFKA SETTINGS PRODUCE_TOPIC = os.getenv("CURRENCY_TOPIC") CONSUME_TOPIC = os.getenv("WEB_TOPIC") KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
Не забываем про .env:
KAFKA_BOOTSTRAP_SERVERS=kafka:9092 WEB_TOPIC=web REDIS_HOST=redis REDIS_PORT=6379 CURRENCY_TOPIC=currency
И пишем код в main.py для прослушивания и отправки сообщений посредством брокера:
# services/currency/currency/main.py import json import asyncio from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from currency.currency import settings from currency.currency.controller import CurrencyController async def start_consumer() -> AIOKafkaConsumer: consumer = AIOKafkaConsumer( settings.CONSUME_TOPIC, bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, ) await consumer.start() return consumer async def start_producer() -> AIOKafkaProducer: producer = AIOKafkaProducer(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS) await producer.start() return producer async def main() -> None: controller = CurrencyController() consumer = await start_consumer() producer = await start_producer() try: async for message in consumer: decoded_message = json.loads(message.value) currency_info = await controller.get_currency_info( currency_char_code=decoded_message.get("currency_char_code"), ) or dict() currency_info["telegram_id"] = decoded_message["telegram_id"] info_to_send = json.dumps(currency_info).encode(encoding="utf-8") await producer.send(topic=settings.PRODUCE_TOPIC, value=info_to_send) finally: await consumer.stop() await producer.stop() if __name__ == '__main__': asyncio.run(main())
Здесь мы инициализируем производителя и потребителя, слушаем на входящие сообщения, как только оно появляется — парсим информацию о валюте и отправляем уже телеграмм боту. Еще раз: слушаем WEB_TOPIC, а производим в CURRENCY_TOPIC .
Осталось написать Dockerfile:
# services/currency/Dockerfile FROM python:3.11-slim-buster WORKDIR /usr/src/currency/ ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 ENV PYTHONPATH=/usr/src COPY poetry.lock pyproject.toml ./ RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root COPY currency currency/ WORKDIR ./currency
Notify сервис (бот)
Считаю нецелесообразно рассказывать в этой статье, как создавать своего телеграмм-бота. В интернете масса информации по этой теме. Начинаем мы с того, что у вас есть токен и вы знаете зачем он нужен.
Как и в предыдущем сервисе инициализируем poetry:
cd services/notify && poetry init
И добавим необходимые зависимости:
poetry add aiogram aiokafka
Создадим внутри директории notify новую директорию app, а в ней settings.py и добавим туда следующий код:
import os BOT_TOKEN = os.getenv("BOT_TOKEN") CONSUME_TOPIC = os.getenv("CURRENCY_TOPIC") KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
Из интересного здесь то, что в качестве прослушиваемого топика, мы используем CURRENCY_TOPIC, куда отправляет сообщение наш микро-сервис на Scrapy. Также не забудьте в .env файл добавить переменную BOT_TOKEN и указать в ней токен вашего бота. Файл с переменными окружениями по итогу должен выглядеть примерно так:
KAFKA_BOOTSTRAP_SERVERS=kafka:9092 WEB_TOPIC=web REDIS_HOST=redis REDIS_PORT=6379 CURRENCY_TOPIC=currency BOT_TOKEN=#токен
Далее рядом создадим main.py, в нем будет запуск бота и прослушивание входящих сообщений:
from aiogram import Bot from aiogram import Dispatcher from aiokafka import AIOKafkaConsumer from notify.app import settings import asyncio import json dispatcher = Dispatcher() BOT = Bot(token=settings.BOT_TOKEN) async def consume() -> None: consumer = AIOKafkaConsumer( settings.CONSUME_TOPIC, bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, ) await consumer.start() try: async for msg in consumer: serialized = json.loads(msg.value) await BOT.send_message( chat_id=serialized.get("telegram_id"), text=serialized.get("currency_value") or "Валюта не найдена", ) finally: await consumer.stop() async def main() -> None: polling = asyncio.create_task(dispatcher.start_polling(BOT)) consuming = asyncio.create_task(consume()) await asyncio.gather(polling, consuming) print("Bot has successfully started polling") if __name__ == "__main__": asyncio.run(main())
Будем отделять мух от котлет и обращать внимание на код, связанный с потреблением сообщений.
-
Мы воспользовались классом
AIOKafkaConsumer, куда передали топик, который мы слушаем, иBOOTSTRAP_SERVERS. -
Асинхронная функция
consumeсостоит глобально из трех частей: устанавливаем соединение с брокером —await consumer.start(), слушаем на входящие сообщения —async for msg in consumer. Если сообщение есть — десериализуем его и отправляем пользователю в его телеграмм при помощи методаsend_message.
Как вы поняли, здесь отсутствует «продьюсер», так как это конечная точка нашей системы, откуда сообщение уже отправится пользователю в телеграмм бот.
Также напишем боту Dockerfile, он почти ничем не отличается от предыдущего:
# services/notify/Dockerfile FROM python:3.11-slim-buster WORKDIR /usr/src/notify ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 ENV PYTHONPATH=/usr/src COPY poetry.lock pyproject.toml ./ RUN pip install poetry && poetry config virtualenvs.create false && poetry install --no-root COPY app app/ WORKDIR ./app
Инфраструктура
Для того, чтобы развернуть все наши шестеренки, опишем docker-compose.yml:
Hidden text
version: '3.3' services: web: restart: on-failure build: context: services/web/ dockerfile: Dockerfile container_name: "web" ports: - "8000:8000" environment: KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS WEB_TOPIC: $WEB_TOPIC command: uvicorn main:app --reload --host 0.0.0.0 --port 8000 currency: restart: on-failure build: context: services/currency/ dockerfile: Dockerfile container_name: "currency" environment: KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS WEB_TOPIC: $WEB_TOPIC CURRENCY_TOPIC: $CURRENCY_TOPIC REDIS_HOST: $REDIS_HOST REDIS_PORT: $REDIS_PORT command: python main.py bot: restart: on-failure build: context: services/notify/ dockerfile: Dockerfile container_name: "notify" environment: KAFKA_BOOTSTRAP_SERVERS: $KAFKA_BOOTSTRAP_SERVERS CURRENCY_TOPIC: $CURRENCY_TOPIC BOT_TOKEN: $BOT_TOKEN command: python main.py zookeeper: image: 'bitnami/zookeeper:3.7.0' container_name: zookeeper ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:2.8.0' container_name: kafka ports: - "9093:9093" environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper redis: image: redis container_name: redis environment: REDIS_HOST: REDIS_HOST ports: - "6379:6379"
Момент истины — поднимаем:
docker compose up -d --build
Заранее следует проверить, что ничего не «отвалилось»:
docker ps
В выводе должно быть шесть рабочих контейнеров. Если это так — убедитесь, что вы начали диалог с ботом и, если оно тоже так, смело переходим по адресу и отправляем POST-запрос нашему единственному енд-поинту, куда передаем свой телеграмм ID и, допустим, USD. Ответом на это действие должно быть сообщение в телеграмме от нашего бота с одиноким и неприятным числом.
Заключение
Напоследок, хотелось бы сказать, что в данной статье намеренно пропущены некоторые моменты с настройкой Kafka, с ее переменными, которые можно указать в docker-compose.yml. Я не хотел перегружать статью информацией, так как, полагаю, что для человека, который будет работать с этим инструментом впервые, важнее получить практический опыт.
Я благодарен, если вы дочитали статью до этого момента. Надеюсь, я смог в понятной форме рассказать и, что главное, показать, как работает брокер на поверхностном уровне, не залезая в дебри. До новых встреч!
ссылка на оригинал статьи https://habr.com/ru/articles/793936/
Добавить комментарий