Пишем полезный сервис на Python для получении ИНН

от автора

В этой статье хочу рассказать о том, как написать полезный сервис, для получения ИНН по персональным данным (паспортные данные). ИНН физического лица получаем с использованием сайта https://service.nalog.ru/. Похожая функциональность, скорее всего, уже где-то была реализована. Основная идея статьи — поделиться опытом работы с Python в части создания законченного проекта с использованием контейнера зависимостей, создания слушателей для RabbitMQ и работой с базой данных MongoDB. Работа с клиентами сервиса реализована через RabbitMQ в режиме непрерывного чтения очереди, отправкой результата в выходную очередь. Сервис будет жить в Kubernetes, что требует наличие liveness и readiness проб. Для этого используется веб-сервер.

Фото by Christina Morillo from Pexels

Фото by Christina Morillo from Pexels

Общие сведения

Сервис будем реализовывать на Python 3.10 с использованием библиотек aio-pika, fastapi, pydantic, motor и других библиотек, которые будут указаны в pyproject.toml проекта. В качестве базы данных используем MongoDB 4+. Обращение к сервису налоговой выполняется при помощи библиотеки aiohttp. Проект размещён в публичном доступе на GitHub.

Приложение функционирует как слушатель входной очереди и веб-сервер для отдачи liveness и readiness-проб. При получении сообщения из очереди, из заголовка reply-to вычитывается имя выходной очереди, в которую будет направлен ответ. Обработка запроса передаётся в сервис, который проверяет наличие похожего запроса в базе данных. В случае отсутствия данных по клиенту, выполняется запрос к внешнему сервису. Внешний сервис может обработать какое-то количество сообщений без запроса капчи. После превышения лимитов, которые доподлинно не известны (но изменяются при общей повышенной нагрузке), сообщение помещается в мёртвую очередь и через указанное в настройках время возвращается в обработку.

Подготовительные работы для базы данных не требуется. При первом подключении к MongoDB будут созданы необходимые коллекции и индексы.

Контракт общения с сервисом

Определим контракт входного сообщения в формате JSON:

{  "requestId": str,  "firstName": str,  "lastName": str,  "middleName": str,  "birthDate": date,  "documentSerial": str,  "documentNumber": str,  "documentDate": date }

Все поля интуитивно понятны. Атрибут requestId должен быть уникален в пределах всех сообщений, имеет смысл передавать его как строковое представление GUID.

Имя выходной очереди может передаваться через поле reply-to заголовка сообщения.

Контракт выходного сообщения будет следующим:

{ "requestId": str, "inn": str, "cached": bool, "details": str, "elapsedTime": float }

В ответе будем отдавать код запроса, собственно ИНН, время, за которое отработал сервис и признак кэшированного ответа.

Структура проекта

Общая структура директорий проекта следующая.

src   |--inn_service     |--clients     |--connection_managers     |--core     |--infrastructure        |--controllers        |--handlers        |--http_server        |--queue_manager                          |--models         |--repositories     |--serializers     |--services       main.py .env.example .gitignore docker-compose.yaml pyproject.yaml

В корневой директории будут размещаться инструменты запуска проекта: docker-compose, make-файл запуска линтинга и тестов. Собственно проект размещён в src/inn_service и содержит:

  • clients — клиенты для подключения к действительным поставщикам данных (nalog.ru и прочие);

  • connection_managers — инфраструктурные подключения к базе данных, очередям;

  • core — общий код приложения (собственно приложение, контейнер);

  • infrastructure — менеджер обработчиков очередей, сами обработчики, инфраструктурные контроллеры;

  • models — модели приложения, DTO-объекты;

  • repositories — репозиторий для работы с базой данных;

  • serializers — сериализаторы входных запросов, данных для отправки в провайдер ИНН;

  • services — сервисы приложения.

Работу по созданию виртуального подключения переложим на PyCharm и poetry. Краткая команда установки: poetry install.

Настройки приложения

Начнём разработку с создания настроек приложения, используя BaseSettings из пакета pydantic.

В файле settings.py будут находиться настройки.

Класс настроек settings.py
class Settings(BaseSettings):       app_name: str = 'INN service'       app_request_retry_times: int  # Количество попыток обработки внешнего запроса       app_request_retry_sec: int  # Время задержки в секундах перед повторной обработкой запроса          http_host: str       http_port: int       http_handler: str = 'asyncio'          mongo_host: str       mongo_port: str       mongo_user: str       mongo_pass: str       mongo_name: str       mongo_rs: Optional[str] = None       mongo_auth: str       mongo_timeout_server_select: int = 5000          rabbitmq_host: str       rabbitmq_port: int       rabbitmq_user: str       rabbitmq_pass: str       rabbitmq_vhost: str       rabbitmq_exchange_type: str       rabbitmq_prefetch_count: int       rabbitmq_source_queue_name: str          client_nalog_url: str  # Адрес внешнего сервиса для получения ИНН       client_nalog_timeout_sec: int  # Таймаут ожидания ответа от сервиса       client_nalog_retries: int  # Количество попыток запросов к внешнему сервису       client_nalog_wait_sec: int  # Время ожидания между попытками client_nalog_retries          @property       def mongo_dsn(self) -> str:           mongo_dsn = 'mongodb://{}:{}@{}:{}/{}'.format(               self.mongo_user,               self.mongo_pass,               self.mongo_host,               self.mongo_port,               self.mongo_auth           )              if self.mongo_rs:               mongo_dsn += f'?replicaSet={self.mongo_rs}'              return mongo_dsn          @property       def rabbitmq_dsn(self) -> str:           return 'amqp://{}:{}@{}:{}/{}'.format(               self.rabbitmq_user,               self.rabbitmq_pass,               self.rabbitmq_host,               self.rabbitmq_port,               self.rabbitmq_vhost           )

Предлагаю не указывать значения по умолчанию для настроек. Если что-то пойдёт не так, то сразу увидим проблему. В этот момент можно подготовить сразу и файл .env.example, содержащий настройки по-умолчанию для сервиса.

Подключения к инфраструктуре

Создадим слой подключения к инфраструктуре rabbitmq, mongodb через компоненты aio-pika и motor:

poetry add motor aio-pika fast fastapi uvicorn injector

Слой подключения будет размещаться в connection_managers и предназначен для организации подключения к базе данных и менеджеру очередей. Добавим две миксины для создания механизма регистрации автозапуска и завершения приложения. Механизм автозапуска функций применяется при старте приложения для инициализации подключения к RabbitMQ и MongoDB, а также для создания индексов в коллекции базы данных. В случае возникновения ошибок при подключении, приложение не стартует и выдаётся ошибка в логи.

class StartupEventMixin(ABC):          @abstractmethod       def startup(self) -> Coroutine:           raise NotImplementedError         class ShutdownEventMixin(ABC):          @abstractmethod       def shutdown(self) -> Coroutine:           raise NotImplementedError

На примере RabbitConnectionManager продемонстрируем реализацию.

class RabbitConnectionManager(StartupEventMixin, ShutdownEventMixin, EventLiveProbeMixin):       def startup(self) -> Coroutine:           return self.create_connection()  def shutdown(self) -> Coroutine:           return self.close_connection()      async def create_connection(self) -> None:       self.logger.info('Create connection RabbitMQ')       try:           self._connection = await connect_robust(self._dsn)           self._connection.reconnect_callbacks.add(self.on_connection_restore)           self._connection.close_callbacks.add(self.on_close_connection)           self.connected = True       except ConnectionError as exc:           err_message = f'Rabbit connection problem: {exc}'           self.logger.error(err_message)           raise ConnectionError(err_message)      async def close_connection(self) -> None:       if self._connection:           await self._connection.close()  # ... некоторый код пропущен, полная версия на гитхабе  def on_close_connection(self, *args):       self.logger.error('Lost connection to RabbitMQ...')       self.connected = False      def on_connection_restore(self, *args):       self.logger.info('Connection to RabbitMQ has been restored...')       self._channel = None       self._exchange = None       self.connected = True

При подключении к RabbitMQ устанавливаются функции коллбэков для реагирования на потерю соединения и его восстановление.

Менеджер обработчиков

Менеджер обработчиков предназначен для управления слушателями (consumers) очередей. В проекте используется концепция «мёртвых очередей», которая позволяет отложить сообщение на некоторое время и вернуться к его обработке позже. Причиной для этого может являться долгий ответ от провайдера, временные ошибки провайдера, требование ввода капчи из-за нагрузки. Достаточно подробно механизм мёртвых очередей технически разобран в статье Отложенные ретраи силами RabbitMQ. Каждый обработчик очереди должен хранить и возвращать признак использования ретраев, время между возвратами в основную очередь на обработку, а также имя очереди, которую планирует слушать. Основной код обработчика находится в run_handler. От функции ожидается True при успешной обработке, либо непоправимой ошибке запроса (некорректное тело сообщения) и False, если запрос не удалось обработать, но следует повторить позднее.

Код базового обработчика:

class BaseHandler(ABC):       def __init__(               self,               settings: Settings,               logger: AppLogger,               rabbitmq_connection: RabbitConnectionManager       ) -> None:           self.settings = settings           self.logger = logger           self.rabbitmq_connection = rabbitmq_connection          @abstractmethod       def get_use_retry(self) -> bool:           raise NotImplementedError          def get_retry_ttl(self) -> int:           return 0          @abstractmethod       def get_source_queue(self) -> str:           raise NotImplementedError          def convert_seconds_to_mseconds(self, value: int) -> int:           return value * 1000          @abstractmethod       async def run_handler(               self,               message: dict,               request_id: Optional[str],               result_queue: Optional[str],               count_retry: Optional[int] = 0       ) -> bool:           raise NotImplementedError

Собственно единственный наследник класса RequestHandler, реализующий приём и обработку сообщения:

Класс RequestHandler
class RequestHandler(BaseHandler):       def __init__(               self,               settings: Settings,               logger: AppLogger,               rabbitmq_connection: RabbitConnectionManager,               service: InnService       ) -> None:           super().__init__(settings, logger, rabbitmq_connection)           self.source_queue_name = self.settings.rabbitmq_source_queue_name           self.retry_times = self.settings.app_request_retry_times           self.retry_sec = self.settings.app_request_retry_sec           self.service = service          def get_source_queue(self) -> str:           return self.source_queue_name          def get_use_retry(self) -> bool:           return True          def get_retry_ttl(self) -> int:           return self.retry_sec          async def run_handler(               self,               message: dict,               request_id: Optional[str],               result_queue: Optional[str],               count_retry: Optional[int] = 0       ) -> bool:           if count_retry > self.retry_times:               self.logger.warning(f'Request {request_id} was rejected by excess attempts {self.retry_times} times')               return True              self.logger.info(f'Get request {request_id} for response {result_queue}')              client_data = RequestSerializer.parse_obj(message)              response = await self.service.get_client_inn(client_data)              if result_queue:               json_message = response.dict()               await self.rabbitmq_connection.send_data_by_queue(json_message, result_queue)              return True

При получении сообщения проверяем количество повторного попадания в очередь через параметр count_retry. В случае превышения — отправляем статус обработки сообщения (ошибку) в выходную очередь и приостанавливаем обработку данного сообщения. RequestSerializer.parse_obj(message) не обёрнут в try…except блок потому как менеджер очередей контролирует ошибки преобразования сообщений ValidationError.

Работа с базой данных

Выбор на MongoDB пал из-за простоты использования, отсутствия миграций, гибкой схемы обработки данных. В задаче нет необходимости в хранении зависимых данных, оформлении связей между таблицами. Для работы с данными будем использовать паттерн Репозиторий.

В базовом репозитории расположены функции работы с данными, индексами в нотации Mongo, а в конкретных классах реализуем необходимые сервису функции. Создание индексов выполняется при старте приложения в фоновом режиме (флаг background), для чего используется имплементация миксины StartupEventMixin. Запросы набора данных поддерживают пагинацию и сортировку.

Конкретный класс создаётся на каждую отдельную коллекцию. В проекте один репозиторий для клиентских запросов. Модель для хранения данных находится в директории models и называется ClientDataModel. Клиентская модель создана с типизацией, поддерживаемой MongoDB (datetime вместо date), для атрибута created_at указана функция генерации значения по умолчанию через default_factory. Также в модель добавлена функция подсчёта времени обработки запроса elapsed_time и метод класса для создания объекта из клиентского запроса.

class ClientDataModel(BaseModel):       created_at: datetime = Field(default_factory=datetime.utcnow)       request_id: str       first_name: str       last_name: str       middle_name: str       birth_date: datetime       birth_place: str = Field(default='')       passport_num: str       document_date: datetime       executed_at: Optional[datetime]       inn: Optional[str]       error: Optional[str]          @classmethod       def create_from_request(cls, request: RequestMqSerializer) -> 'ClientDataModel':           return ClientDataModel(               request_id=request.request_id,               first_name=request.first_name,               last_name=request.last_name,               middle_name=request.middle_name,               birth_date=datetime.combine(request.birth_date, datetime.min.time()),               passport_num='{} {}'.format(request.document_serial, request.document_number),               document_date=datetime.combine(request.document_date, datetime.min.time()),           )          @property       def elapsed_time(self) -> float:           end = self.executed_at or datetime.utcnow()           return (end - self.created_at).total_seconds()
Код базового репозитория
class BaseRepository(StartupEventMixin):          def __init__(self, mongodb_connection_manager: MongoConnectionManager, setting: Settings) -> None:           self.mongodb_connection_manager = mongodb_connection_manager           self.db_name = setting.mongo_name          @property       def collection_name(self) -> str:           raise NotImplementedError          @property       def collection_indexes(self) -> Iterable[IndexDef]:           raise NotImplementedError          def startup(self) -> Coroutine:           return self.create_indexes()          async def create_index(self, field_name: str, sort_id: int) -> None:           connection = await self.mongodb_connection_manager.get_connection()           collection = connection[self.db_name][self.collection_name]           await collection.create_index([(field_name, sort_id), ], background=True)          async def create_indexes(self) -> None:           tasks = []           for index_item in self.collection_indexes:               tasks.append(self.create_index(index_item.name, index_item.sort))           asyncio.ensure_future(asyncio.gather(*tasks))          async def get_one_document(self, criteria: dict) -> Optional[dict]:           connection = await self.mongodb_connection_manager.get_connection()           collection = connection[self.db_name][self.collection_name]           return await collection.find_one(criteria)          async def get_list_document(               self,               criteria: dict,               sort_criteria: Optional[list] = None,               limit: Optional[int] = 0,               skip: Optional[int] = 0,       ) -> List[dict]:           if not sort_criteria:               sort_criteria = []           connection = await self.mongodb_connection_manager.get_connection()           cursor = connection[self.db_name][self.collection_name].find(               criteria,               limit=limit,               skip=skip,               sort=sort_criteria           )              result = list()           async for data in cursor:               result.append(data)           return result          async def save_document(self, data: dict) -> str:           connection = await self.mongodb_connection_manager.get_connection()           result = await connection[self.db_name][self.collection_name].insert_one(data)           return result.inserted_id          async def update_document(self, criteria: dict, data: dict) -> None:           connection = await self.mongodb_connection_manager.get_connection()           await connection[self.db_name][self.collection_name].update_one(criteria, {'$set': data})

Сервисный слой

Сервисный слой выполняет всю необходимую обработку с данными.

  • обращение в базу данных для поиска аналогичного запроса (request_id и паспортные данные);

  • отдаёт результат, если данные были найдены;

  • выполняет запрос к API;

  • сохраняет результат запроса в базу данных;

  • возвращает ответ.

В сервисном слое попытался абстрагироваться от работы с инфраструктурой. Возврат ответа производится в вызывающую функцию, которая должна знать куда вернуть ответ. В данном случае, менеджер очередей «знает» куда ему ответить благодаря наличию поля reply-to в заголовке запроса. Возвращаемое значение оформлено в виде DTO-объекта (RequestDTO).

Код класса InnService
class InnService:       def __init__(               self,               settings: Settings,               logger: AppLogger,               client: NalogApiClient,               storage: RequestRepository       ) -> None:           self.settings = settings           self.logger = logger           self.client = client           self.storage_repository = storage          async def get_client_inn_from_storage(self, client_data: RequestSerializer) -> Optional[RequestModel]:           client_passport = f'{client_data.document_serial} {client_data.document_number}'           client_request = await self.storage_repository.find_request(client_passport, client_data.request_id)           return client_request          def update_status(self, model: RequestModel, inn: str, error: str) -> None:           model.inn = inn           model.error = error          async def get_client_inn(self, client_data: RequestSerializer) -> RequestDTO:           """Получение клиентского ИНН"""           start_process = datetime.utcnow()           model = RequestModel.create_from_request(client_data)              # Получить данные из БД           existing_data = await self.get_client_inn_from_storage(client_data)           if existing_data:               elapsed_time = (datetime.utcnow() - start_process).total_seconds()               return RequestDTO(                   request_id=client_data.request_id,                   inn=existing_data.inn,                   elapsed_time=elapsed_time,                   cashed=True               )              # Сделать фактический запрос в Nalog API           request = NalogApiRequestSerializer.create_from_request(client_data)           error, result = None, ''           try:               result = await self.client.send_request_for_inn(request)           except NalogApiClientException as exception:               self.logger.error('Error request to Nalog api service', details=str(exception))               error = str(exception)              self.update_status(model, result, error)           await self.storage_repository.save_request(model)              return RequestDTO(               request_id=model.request_id,               inn=model.inn,               details=model.error,               elapsed_time=model.elapsed_time           )

Второй сервис в приложении — это сервис опроса инфраструктуры для health-check. Инфраструктурные менеджеры, которые необходимо мониторить, должны наследоваться от миксины EventLiveProbeMixin и реализовать функцию is_connected.

Клиент

Клиент NalogApiClient предназначен для выполнения POST запроса к https://service.nalog.ru/inn.do и разбора статуса ответа. Функция непосредственного оформления запроса обёрнута в retry декоратор повторителя запроса при возникновении ошибок. Настройки повторителя в общих настройках приложения.

class NalogApiClient:       CLIENT_EXCEPTIONS = (           NalogApiClientException,           aiohttp.ClientProxyConnectionError,           aiohttp.ServerTimeoutError,       )          def __init__(self, settings: Settings, logger: AppLogger):           self.nalog_api_service_url = settings.client_nalog_url           self.request_timeout = settings.client_nalog_timeout_sec           self.retries_times = settings.client_nalog_retries           self.retries_wait = settings.client_nalog_wait_sec           self.logger = logger           self.timeout = aiohttp.ClientTimeout(total=self.request_timeout)          @property       def _headers(self):           return {               "Accept": "application/json, text/javascript, */*; q=0.01",               "Accept-Language": "ru-RU,ru",               "Connection": "keep-alive",               "Origin": "https://service.nalog.ru",               "Referer": self.nalog_api_service_url,               "Sec-Fetch-Dest": "empty",               "Sec-Fetch-Mode": "cors",               "Sec-Fetch-Site": "same-origin",               "Sec-GPC": "1",               "X-Requested-With": "XMLHttpRequest",           }          async def send_request_for_inn(self, nalog_api_request: NalogApiRequestSerializer) -> Optional[str]:           self.logger.debug(f'Request to nalog api service for {nalog_api_request.client_fullname}')              form_data = nalog_api_request.dict(by_alias=True)              @retry(self.CLIENT_EXCEPTIONS, logger=self.logger, attempts=self.retries_times, wait_sec=self.retries_wait)           async def make_request(client_session: aiohttp.ClientSession):               async with client_session.post(url=self.nalog_api_service_url, data=form_data) as response:                   if response.status not in [http.HTTPStatus.OK, http.HTTPStatus.NOT_FOUND]:                       response_text = await response.text()                       raise NalogApiClientException(response_text)                   data = await response.json()                   code = data.get('code')                   captcha_required = data.get('captchaRequired')                   if captcha_required:                       raise NalogApiClientException(f'Captcha required for request {nalog_api_request.client_fullname}')                   if code == 0:                       return 'no inn'                   elif code == 1:                       return data.get('inn')                   else:                       raise NalogApiClientException(f'Unable to parse response! Details: {response}')              async with aiohttp.ClientSession(timeout=self.timeout, headers=self._headers) as session:               return await make_request(session)

Контейнер

Контейнер предназначен для сборки необходимых зависимостей и передачи их в приложение. Наш контейнер собран в классе ApplicationContainer. Все зависимости пробрасываются в виде синглтонов @singleton и регистрируются как провайдеры зависимостей типов @provider предоставляемых библиотекой injector. При написании тестов необходимо подготовить другой контейнер с актуальными fake или stub-объектами.

Основной интерес по работе с контейнером сосредоточен в классе ContainerManager, который используется для проверки реализации миксин EventSubscriberMixin и EventLiveProbeMixin. Функция get_event_collection формирует списки функций обратного вызова для старта и выхода из приложения. Проход по спискам и вызов функций обратного вызова реализован в функциях: run_startup и run_shutdown.

class ContainerManager:          def __init__(self, cls_container: Type[Container]) -> None:           self._container = Injector(cls_container())           self._bindings = self._container.binder._bindings          def get_container(self) -> Injector:           return self._container          def get_live_probe_handlers(self) -> List[Type[Callable]]:           result = []           binding_collection = [binding for binding in self._bindings]           for binding in binding_collection:               if issubclass(binding, EventLiveProbeMixin):                   binding_obj = self._container.get(binding)                   result.append(binding_obj.is_connected)           return result          def get_startup_handlers(self):           handlers = []           binding_collection = [binding for binding in self._bindings]           for binding in binding_collection:               if issubclass(binding, StartupEventMixin):                   binding_obj = self._container.get(binding)                   handlers.append(binding_obj.startup())           return handlers          def get_shutdown_handlers(self):           handlers = []           binding_collection = [binding for binding in self._bindings]           for binding in binding_collection:               if issubclass(binding, ShutdownEventMixin):                   binding_obj = self._container.get(binding)                   handlers.append(binding_obj.shutdown())           return handlers          async def run_startup(self) -> None:           exception = None           for handler in self.get_startup_handlers():               if exception:                   handler.close()               else:                   try:                       await handler                   except Exception as exc:                       exception = exc              if exception is not None:               raise exception          async def run_shutdown(self) -> None:           handlers = []           for handler in self.get_shutdown_handlers():               handlers.append(handler)           await asyncio.gather(*handlers)

Собственно сам контейнер, в котором производится инициализация нужных экземпляров классов. При написании тестов будет создан аналогичный контейнер.

class ApplicationContainer(Container):          @singleton       @provider         def provide_settings(self) -> Settings:           return Settings()      # ... немного кода пропущено      @singleton       @provider         def provide_mongodb_connection(self, settings: Settings, logger: AppLogger) -> MongoConnectionManager:           return MongoConnectionManager(settings, logger)          @singleton       @provider         def provide_rabbitmq_connection(self, settings: Settings, logger: AppLogger) -> RabbitConnectionManager:           return RabbitConnectionManager(settings, logger)          @singleton       @provider         def provide_nalog_api_client(self, settings: Settings, logger: AppLogger) -> NalogApiClient:           return NalogApiClient(settings, logger)          @singleton       @provider         def provide_request_repository(self, settings: Settings, mongo_connection: MongoConnectionManager) -> RequestRepository:           return RequestRepository(mongo_connection, settings)

Приложение

Основная задача приложения — собрать всё воедино и запустить общий поток выполнения. Код сборки приложения предельно простой, инициализацию классов выполняет менеджер контейнера. Сборка приложения выполняется следующими шагами:

  • получение контейнера, передача его в менеджер контейнеров;

  • инициализация event_loop;

  • добавление обработчиков для очередей;

  • запуск инициализаторов для инфраструктурного слоя (реализующих startup миксины);

  • запуск веб-сервера FastAPI для отдачи health-check;

  • включение глобального обработчика ошибок.

class Application:          def __init__(self, cls_container: Type[Container]) -> None:           self.loop = asyncio.get_event_loop()           self.container_manager = ContainerManager(cls_container)           self.container = self.container_manager.get_container()           self.settings = self.container.get(Settings)           self.logger = self.container.get(AppLogger)           self.live_probe_service = self.container.get(LiveProbeService)           self.queue_manager = self.container.get(QueueManager)           self.app_name = self.settings.app_name           self.http_server = None          def init_application(self):           self.http_server = ServerAPIManager(self.container)              request_handler = self.container.get(RequestHandler)           self.queue_manager.add_handler(request_handler)              live_probe_handlers = self.container_manager.get_live_probe_handlers()           for handler in live_probe_handlers:               self.live_probe_service.add_component(handler)          def run(self) -> None:           self.logger.info(f'Starting application {self.app_name}')              self.init_application()              try:               self.loop.run_until_complete(self.container_manager.run_startup())                  tasks = asyncio.gather(                   self.http_server.serve(),                   self.queue_manager.run_handlers_async(),               )               self.loop.run_until_complete(tasks)                  self.loop.run_forever()           except BaseException as exception:               exit(1)           finally:               self.loop.run_until_complete(self.container_manager.run_shutdown())                  self.loop.close()               self.logger.info('Application disabled')

Приложение стартует из main-скрипта с использованием небольшой библиотеки typer. Маленькая библиотека имеет возможность удобно обрабатывать параметры командной строки.

import typer   from core.application import Application   from app_container import ApplicationContainer         def main():       try:           application = Application(ApplicationContainer)           application.run()       except BaseException as exc:           typer.echo(f'Error starting application. Details: {str(exc)}')         if __name__ == "__main__":       typer.run(main)

Как это всё запустить?

Проект содержит файл docker-compose для сборки и запуска. Так же необходимо необходимо скопировать файл .env.example в файл .env .

docker compose build docker compose up

После выполнения этих команд, будет запущен экземпляр mongodb на 27017 порту и rabbitmq на 5672 порту с админкой на 15672. В административную панель RabbitMQ можно зайти по адресу http://localhost:15672. В разделе очередей необходимо создать новую очередь, в которую будут направляться результаты работы сервиса и прибиндить её к exchange по умолчанию (direct).

Продолжение следует

В статье рассмотрена тема разработки приложения на Python с использованием очередей, контейнером зависимостей и поддержкой health-check. Предлагаю обсудить архитектуру в комментариях, а затем продолжить развивать сервис. Следующими итерациями планирую добавить гипотетического не бесплатного клиента, которого будем использовать после определённого количества запросов в бесплатный сервис. И в завершении написать тесты.

Материалы, которые могут быть полезны для понимания материала:


ссылка на оригинал статьи https://habr.com/ru/articles/729396/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *