Мониторинг демон на Asyncio + Dependency Injector — руководство по применению dependency injection

от автора

Привет,

Я создатель Dependency Injector. Это dependency injection фреймворк для Python.

Это еще одно руководство по построению приложений с помощью Dependency Injector.

Сегодня хочу показать как можно построить асинхронный демон на базе модуля asyncio.

Руководство состоит из таких частей:

  1. Что мы будем строить?
  2. Проверка инструментов
  3. Структура проекта
  4. Подготовка окружения
  5. Логирование и конфигурация
  6. Диспетчер
  7. Мониторинг example.com
  8. Мониторинг httpbin.org
  9. Тесты
  10. Заключение

Завершенный проект можно найти на Github.

Для старта желательно иметь:

  • Начальные знания по asyncio
  • Общее представление о принципе dependency injection

Что мы будем строить?

Мы будем строить мониторинг демон, который будет следить за доступом к веб-сервисам.

Демон будет посылать запросы к example.com и httpbin.org каждые несколько секунд. При получении ответа он будет записывать в лог такие данные:

  • Код ответа
  • Количество байт в ответе
  • Время, затраченное на выполнение запроса

Проверка инструментов

Мы будем использовать Docker и docker-compose. Давайте проверим, что они установлены:

docker --version docker-compose --version 

Вывод должен выглядеть приблизительно так:

Docker version 19.03.12, build 48a66213fe docker-compose version 1.26.2, build eefe0d31 

Если Docker или docker-compose не установлены, их нужно установить перед тем как продолжить. Следуйте этим руководствам:

Инструменты готовы. Переходим к структуре проекта.

Структура проекта

Создаем папку проекта и переходим в нее:

mkdir monitoring-daemon-tutorial cd monitoring-daemon-tutorial 

Теперь нам нужно создать начальную структуру проекта. Создаем файлы и папки следуя структуре ниже. Все файлы пока будут пустыми. Мы наполним их позже.

Начальная структура проекта:

./ ├── monitoringdaemon/ │   ├── __init__.py │   ├── __main__.py │   └── containers.py ├── config.yml ├── docker-compose.yml ├── Dockerfile └── requirements.txt 

Начальная структура проекта готова. Мы расширим ее с следующих секциях.

Дальше нас ждет подготовка окружения.

Подготовка окружения

В этом разделе мы подготовим окружение для запуска нашего демона.

Для начала нужно определить зависимости. Мы будем использовать такие пакеты:

  • dependency-injector — dependency injection фреймворк
  • aiohttp — веб фреймворк (нам нужен только http клиент)
  • pyyaml — библиотека для парсинга YAML файлов, используется для чтения конфига
  • pytest — фреймворк для тестирования
  • pytest-asyncio — библиотека-помогатор для тестирования asyncio приложений
  • pytest-cov — библиотека-помогатор для измерения покрытия кода тестами

Добавим следующие строки в файл requirements.txt:

dependency-injector aiohttp pyyaml pytest pytest-asyncio pytest-cov 

И выполним в терминале:

pip install -r requirements.txt 

Далее создаем Dockerfile. Он будет описывать процесс сборки и запуска нашего демона. Мы будем использовать python:3.8-buster в качестве базового образа.

Добавим следующие строки в файл Dockerfile:

FROM python:3.8-buster  ENV PYTHONUNBUFFERED=1  WORKDIR /code COPY . /code/  RUN apt-get install openssl \  && pip install --upgrade pip \  && pip install -r requirements.txt \  && rm -rf ~/.cache  CMD ["python", "-m", "monitoringdaemon"] 

Последним шагом определим настройки docker-compose.

Добавим следующие строки в файл docker-compose.yml:

version: "3.7"  services:    monitor:     build: ./     image: monitoring-daemon     volumes:       - "./:/code" 

Все готово. Давайте запустим сборку образа и проверим что окружение настроено верно.

Выполним в терминале:

docker-compose build 

Процесс сборки может занять несколько минут. В конце вы должны увидеть:

Successfully built 5b4ee5e76e35 Successfully tagged monitoring-daemon:latest 

После того как процесс сборки завершен запустим контейнер:

docker-compose up 

Вы увидите:

Creating network "monitoring-daemon-tutorial_default" with the default driver Creating monitoring-daemon-tutorial_monitor_1 ... done Attaching to monitoring-daemon-tutorial_monitor_1 monitoring-daemon-tutorial_monitor_1 exited with code 0 

Окружение готово. Контейнер запускается и завершает работу с кодом 0.

Следующим шагом мы настроим логирование и чтение файла конфигурации.

Логирование и конфигурация

В этом разделе мы настроим логирование и чтение файла конфигурации.

Начнем с добавления основной части нашего приложения — контейнера зависимостей (дальше просто контейнера). Контейнер будет содержать все компоненты приложения.

Добавим первые два компонента. Это объект конфигурации и функция настройки логирования.

Отредактируем containers.py:

"""Application containers module."""  import logging import sys  from dependency_injector import containers, providers   class ApplicationContainer(containers.DeclarativeContainer):     """Application container."""      config = providers.Configuration()      configure_logging = providers.Callable(         logging.basicConfig,         stream=sys.stdout,         level=config.log.level,         format=config.log.format,     ) 

Мы использовали параметры конфигурации перед тем как задали их значения. Это принцип, по которому работает провайдер Configuration.

Сначала используем, потом задаем значения.

Настройки логирования будут содержаться в конфигурационном файле.

Отредактируем config.yml:

log:   level: "INFO"   format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s" 

Теперь определим функцию, которая будет запускать наш демон. Её обычно называют main(). Она будет создавать контейнер. Контейнер будет использован для чтения конфигурационного файла и вызова функции настройки логирования.

Отредактируем __main__.py:

"""Main module."""  from .containers import ApplicationContainer   def main() -> None:     """Run the application."""     container = ApplicationContainer()      container.config.from_yaml('config.yml')     container.configure_logging()   if __name__ == '__main__':     main() 

Контейнер — первый объект в приложении. Он используется для получения всех остальных объектов.

Логирование и чтение конфигурации настроено. В следующем разделе мы создадим диспетчер мониторинговых задач.

Диспетчер

Пришло время добавить диспетчер мониторинговых задач.

Диспетчер будет содержать список мониторинговых задач и контролировать их выполнение. Он будет выполнять каждую задачу в соответствии с расписанием. Класс Monitor — базовый класс для мониторинговых задач. Для создания конкретных задач нужно добавлять дочерние классы и реализовывать метод check().


Добавим диспетчер и базовый класс мониторинговой задачи.

Создадим dispatcher.py и monitors.py в пакете monitoringdaemon:

./ ├── monitoringdaemon/ │   ├── __init__.py │   ├── __main__.py │   ├── containers.py │   ├── dispatcher.py │   └── monitors.py ├── config.yml ├── docker-compose.yml ├── Dockerfile └── requirements.txt 

Добавим следующие строки в файл monitors.py:

"""Monitors module."""  import logging   class Monitor:      def __init__(self, check_every: int) -> None:         self.check_every = check_every         self.logger = logging.getLogger(self.__class__.__name__)      async def check(self) -> None:         raise NotImplementedError() 

и в файл dispatcher.py:

""""Dispatcher module."""  import asyncio import logging import signal import time from typing import List  from .monitors import Monitor   class Dispatcher:      def __init__(self, monitors: List[Monitor]) -> None:         self._monitors = monitors         self._monitor_tasks: List[asyncio.Task] = []         self._logger = logging.getLogger(self.__class__.__name__)         self._stopping = False      def run(self) -> None:         asyncio.run(self.start())      async def start(self) -> None:         self._logger.info('Starting up')          for monitor in self._monitors:             self._monitor_tasks.append(                 asyncio.create_task(self._run_monitor(monitor)),             )          asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)         asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)          await asyncio.gather(*self._monitor_tasks, return_exceptions=True)          self.stop()      def stop(self) -> None:         if self._stopping:             return          self._stopping = True          self._logger.info('Shutting down')         for task, monitor in zip(self._monitor_tasks, self._monitors):             task.cancel()         self._logger.info('Shutdown finished successfully')      @staticmethod     async def _run_monitor(monitor: Monitor) -> None:         def _until_next(last: float) -> float:             time_took = time.time() - last             return monitor.check_every - time_took          while True:             time_start = time.time()              try:                 await monitor.check()             except asyncio.CancelledError:                 break             except Exception:                 monitor.logger.exception('Error executing monitor check')              await asyncio.sleep(_until_next(last=time_start)) 

Диспетчер нужно добавить в контейнер.

Отредактируем containers.py:

"""Application containers module."""  import logging import sys  from dependency_injector import containers, providers  from . import dispatcher   class ApplicationContainer(containers.DeclarativeContainer):     """Application container."""      config = providers.Configuration()      configure_logging = providers.Callable(         logging.basicConfig,         stream=sys.stdout,         level=config.log.level,         format=config.log.format,     )      dispatcher = providers.Factory(         dispatcher.Dispatcher,         monitors=providers.List(             # TODO: add monitors         ),     ) 

Каждый компонент добавляется в контейнер.

В завершении нам нужно обновить функцию main(). Мы получим диспетчер из контейнера и вызовем его метод run().

Отредактируем __main__.py:

"""Main module."""  from .containers import ApplicationContainer   def main() -> None:     """Run the application."""     container = ApplicationContainer()      container.config.from_yaml('config.yml')     container.configure_logging()      dispatcher = container.dispatcher()     dispatcher.run()   if __name__ == '__main__':     main() 

Теперь запустим демон и проверим его работу.

Выполним в терминале:

docker-compose up 

Вывод должен выглядеть так:

Starting monitoring-daemon-tutorial_monitor_1 ... done Attaching to monitoring-daemon-tutorial_monitor_1 monitor_1  | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully monitoring-daemon-tutorial_monitor_1 exited with code 0 

Все работает верно. Диспетчер запускается и выключается так как мониторинговых задач нет.

К концу этого раздела каркас нашего демона готов. В следующем разделе мы добавим первую мониторинговую задачу.

Мониторинг example.com

В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com.

Мы начнем с расширения нашей модели классов новым типом мониторинговой задачи HttpMonitor.

HttpMonitor это дочерний класс Monitor. Мы реализуем метод check(). Он будет отправлять HTTP запрос и логировать полученный ответ. Детали выполнения HTTP запроса будут делегированы классу HttpClient.


Сперва добавим HttpClient.

Создадим файл http.py в пакете monitoringdaemon:

./ ├── monitoringdaemon/ │   ├── __init__.py │   ├── __main__.py │   ├── containers.py │   ├── dispatcher.py │   ├── http.py │   └── monitors.py ├── config.yml ├── docker-compose.yml ├── Dockerfile └── requirements.txt 

И добавим в него следующие строки:

"""Http client module."""  from aiohttp import ClientSession, ClientTimeout, ClientResponse   class HttpClient:      async def request(self, method: str, url: str, timeout: int) -> ClientResponse:         async with ClientSession(timeout=ClientTimeout(timeout)) as session:             async with session.request(method, url) as response:                 return response 

Далее нужно добавить HttpClient в контейнер.

Отредактируем containers.py:

"""Application containers module."""  import logging import sys  from dependency_injector import containers, providers  from . import http, dispatcher   class ApplicationContainer(containers.DeclarativeContainer):     """Application container."""      config = providers.Configuration()      configure_logging = providers.Callable(         logging.basicConfig,         stream=sys.stdout,         level=config.log.level,         format=config.log.format,     )      http_client = providers.Factory(http.HttpClient)      dispatcher = providers.Factory(         dispatcher.Dispatcher,         monitors=providers.List(             # TODO: add monitors         ),     ) 

Теперь мы готовы добавить HttpMonitor. Добавим его в модуль monitors.

Отредактируем monitors.py:

"""Monitors module."""  import logging import time from typing import Dict, Any  from .http import HttpClient   class Monitor:      def __init__(self, check_every: int) -> None:         self.check_every = check_every         self.logger = logging.getLogger(self.__class__.__name__)      async def check(self) -> None:         raise NotImplementedError()   class HttpMonitor(Monitor):      def __init__(             self,             http_client: HttpClient,             options: Dict[str, Any],     ) -> None:         self._client = http_client         self._method = options.pop('method')         self._url = options.pop('url')         self._timeout = options.pop('timeout')         super().__init__(check_every=options.pop('check_every'))      @property     def full_name(self) -> str:         return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)      async def check(self) -> None:         time_start = time.time()          response = await self._client.request(             method=self._method,             url=self._url,             timeout=self._timeout,         )          time_end = time.time()         time_took = time_end - time_start          self.logger.info(             'Response code: %s, content length: %s, request took: %s seconds',             response.status,             response.content_length,             round(time_took, 3)         ) 

У нас все готово для добавления проверки http://example.com. Нам нужно сделать два изменения в контейнере:

  • Добавить фабрику example_monitor.
  • Передать example_monitor в диспетчер.

Отредактируем containers.py:

"""Application containers module."""  import logging import sys  from dependency_injector import containers, providers  from . import http, monitors, dispatcher   class ApplicationContainer(containers.DeclarativeContainer):     """Application container."""      config = providers.Configuration()      configure_logging = providers.Callable(         logging.basicConfig,         stream=sys.stdout,         level=config.log.level,         format=config.log.format,     )      http_client = providers.Factory(http.HttpClient)      example_monitor = providers.Factory(         monitors.HttpMonitor,         http_client=http_client,         options=config.monitors.example,     )      dispatcher = providers.Factory(         dispatcher.Dispatcher,         monitors=providers.List(             example_monitor,         ),     ) 

Провайдер example_monitor имеет зависимость от значений конфигурации. Давайте добавим эти значения:

Отредактируем config.yml:

log:   level: "INFO"   format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"  monitors:    example:     method: "GET"     url: "http://example.com"     timeout: 5     check_every: 5 

Все готово. Запускаем демон и проверяем работу.

Выполняем в терминале:

docker-compose up 

И видим подобный вывод:

Starting monitoring-daemon-tutorial_monitor_1 ... done Attaching to monitoring-daemon-tutorial_monitor_1 monitor_1  | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up monitor_1  | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check monitor_1  |     GET http://example.com monitor_1  |     response code: 200 monitor_1  |     content length: 648 monitor_1  |     request took: 0.067 seconds monitor_1  | monitor_1  | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check monitor_1  |     GET http://example.com monitor_1  |     response code: 200 monitor_1  |     content length: 648 monitor_1  |     request took: 0.073 seconds 

Наш демон может следить за наличием доступа к http://example.com.

Давайте добавим мониторинг https://httpbin.org.

Мониторинг httpbin.org

В этом разделе мы добавим мониторинговую задачу, которая будет следить за доступом к http://example.com.

Добавление мониторинговой задачи для https://httpbin.org будет сделать легче, так как все компоненты уже готовы. Нам просто нужно добавить новый провайдер в контейнер и обновить конфигурацию.

Отредактируем containers.py:

"""Application containers module."""  import logging import sys  from dependency_injector import containers, providers  from . import http, monitors, dispatcher   class ApplicationContainer(containers.DeclarativeContainer):     """Application container."""      config = providers.Configuration()      configure_logging = providers.Callable(         logging.basicConfig,         stream=sys.stdout,         level=config.log.level,         format=config.log.format,     )      http_client = providers.Factory(http.HttpClient)      example_monitor = providers.Factory(         monitors.HttpMonitor,         http_client=http_client,         options=config.monitors.example,     )      httpbin_monitor = providers.Factory(         monitors.HttpMonitor,         http_client=http_client,         options=config.monitors.httpbin,     )      dispatcher = providers.Factory(         dispatcher.Dispatcher,         monitors=providers.List(             example_monitor,             httpbin_monitor,         ),     ) 

Отредактируем config.yml:

log:   level: "INFO"   format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"  monitors:    example:     method: "GET"     url: "http://example.com"     timeout: 5     check_every: 5    httpbin:     method: "GET"     url: "https://httpbin.org/get"     timeout: 5     check_every: 5 

Запустим демон и проверим логи.

Выполним в терминале:

docker-compose up 

И видим подобный вывод:

Starting monitoring-daemon-tutorial_monitor_1 ... done Attaching to monitoring-daemon-tutorial_monitor_1 monitor_1  | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up monitor_1  | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check monitor_1  |     GET http://example.com monitor_1  |     response code: 200 monitor_1  |     content length: 648 monitor_1  |     request took: 0.077 seconds monitor_1  | monitor_1  | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check monitor_1  |     GET https://httpbin.org/get monitor_1  |     response code: 200 monitor_1  |     content length: 310 monitor_1  |     request took: 0.18 seconds monitor_1  | monitor_1  | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check monitor_1  |     GET http://example.com monitor_1  |     response code: 200 monitor_1  |     content length: 648 monitor_1  |     request took: 0.066 seconds monitor_1  | monitor_1  | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check monitor_1  |     GET https://httpbin.org/get monitor_1  |     response code: 200 monitor_1  |     content length: 310 monitor_1  |     request took: 0.126 seconds 

Функциональная часть завершена. Демон следит за наличием доступа к http://example.com и https://httpbin.org.

В следующем разделе мы добавим несколько тестов.

Тесты

Было бы неплохо добавить несколько тестов. Давайте сделаем это.

Создаем файл tests.py в пакете monitoringdaemon:

./ ├── monitoringdaemon/ │   ├── __init__.py │   ├── __main__.py │   ├── containers.py │   ├── dispatcher.py │   ├── http.py │   ├── monitors.py │   └── tests.py ├── config.yml ├── docker-compose.yml ├── Dockerfile └── requirements.txt 

и добавляем в него следующие строки:

"""Tests module."""  import asyncio import dataclasses from unittest import mock  import pytest  from .containers import ApplicationContainer   @dataclasses.dataclass class RequestStub:     status: int     content_length: int   @pytest.fixture def container():     container = ApplicationContainer()     container.config.from_dict({         'log': {             'level': 'INFO',             'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',         },         'monitors': {             'example': {                 'method': 'GET',                 'url': 'http://fake-example.com',                 'timeout': 1,                 'check_every': 1,             },             'httpbin': {                 'method': 'GET',                 'url': 'https://fake-httpbin.org/get',                 'timeout': 1,                 'check_every': 1,             },         },     })     return container   @pytest.mark.asyncio async def test_example_monitor(container, caplog):     caplog.set_level('INFO')      http_client_mock = mock.AsyncMock()     http_client_mock.request.return_value = RequestStub(         status=200,         content_length=635,     )      with container.http_client.override(http_client_mock):         example_monitor = container.example_monitor()         await example_monitor.check()      assert 'http://fake-example.com' in caplog.text     assert 'response code: 200' in caplog.text     assert 'content length: 635' in caplog.text   @pytest.mark.asyncio async def test_dispatcher(container, caplog, event_loop):     caplog.set_level('INFO')      example_monitor_mock = mock.AsyncMock()     httpbin_monitor_mock = mock.AsyncMock()      with container.example_monitor.override(example_monitor_mock), \             container.httpbin_monitor.override(httpbin_monitor_mock):          dispatcher = container.dispatcher()         event_loop.create_task(dispatcher.start())         await asyncio.sleep(0.1)         dispatcher.stop()      assert example_monitor_mock.check.called     assert httpbin_monitor_mock.check.called 

Для запуска тестов выполним в терминале:

docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon 

Должен получиться подобный результат:

platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1 rootdir: /code plugins: asyncio-0.14.0, cov-2.10.0 collected 2 items  monitoringdaemon/tests.py ..                                    [100%]  ----------- coverage: platform linux, python 3.8.3-final-0 ----------- Name                             Stmts   Miss  Cover ---------------------------------------------------- monitoringdaemon/__init__.py         0      0   100% monitoringdaemon/__main__.py         9      9     0% monitoringdaemon/containers.py      11      0   100% monitoringdaemon/dispatcher.py      43      5    88% monitoringdaemon/http.py             6      3    50% monitoringdaemon/monitors.py        23      1    96% monitoringdaemon/tests.py           37      0   100% ---------------------------------------------------- TOTAL                              129     18    86% 

Обратите внимание как в тесте test_example_monitor мы подменяем HttpClient моком с помощью метода .override(). Таким образом можно переопределить возвращаемое значения любого провайдера.

Такие же действия выполняются в тесте test_dispatcher для подмены моками мониторинговых задач.

Заключение

Мы построили мониторинг демон на базе asyncio применяя принцип dependency injection. Мы использовали Dependency Injector в качестве dependency injection фреймворка.

Преимущество, которое вы получаете с Dependency Injector — это контейнер.

Контейнер начинает окупаться, когда вам нужно понять или изменить структуру приложения. С контейнером это легко, потому что все компоненты приложения и их зависимости в одном месте:

"""Application containers module."""  import logging import sys  from dependency_injector import containers, providers  from . import http, monitors, dispatcher   class ApplicationContainer(containers.DeclarativeContainer):     """Application container."""      config = providers.Configuration()      configure_logging = providers.Callable(         logging.basicConfig,         stream=sys.stdout,         level=config.log.level,         format=config.log.format,     )      http_client = providers.Factory(http.HttpClient)      example_monitor = providers.Factory(         monitors.HttpMonitor,         http_client=http_client,         options=config.monitors.example,     )      httpbin_monitor = providers.Factory(         monitors.HttpMonitor,         http_client=http_client,         options=config.monitors.httpbin,     )      dispatcher = providers.Factory(         dispatcher.Dispatcher,         monitors=providers.List(             example_monitor,             httpbin_monitor,         ),     ) 


Контейнер как карта вашего приложения. Вы всегда знайте что от чего зависит.

Что дальше?

ссылка на оригинал статьи https://habr.com/ru/post/514384/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *