Асинхронный JSON-Logger для FastAPI

от автора

Всем привет.

Цель данной статьи и моей личной разработки — написать о том, как я придумал свой собственный и удобный формат json-журналирования специально для компании, в которой я работаю, но не нашел готовых решений для реализации, который позволит мне воплощать очень гибко и удобно некоторые вещи с единым наименованием полей, чтобы логгирование протекало быстро и асинхронно, а также: не заставляло бы меня писать много кода и обойтись одной-двумя библиотеками максимум.

Пошел искать решение в лоб: искал готовые решения в виде сторонних библиотек. Отыскал пару-тройку годных решений. Например библиотека json-logging. Всё хорошо: почитал, что делает и поставил не глядя. Данная библиотека имеет массу преимуществ:

  • может напрямую работать с FastAPI, Flask, Aiohttp, Sanic,

  • перехватывает uvicorn access log,

  • производит форматирование запроса в JSON формат.

Данная библиотека генерирует прекрасный лог сообщения с полями запроса, который поступил в сервис:

Код
{ "type": "request", "written_at": "2017-12-23T16:55:37.280Z", "written_ts": 1514048137280721000, "component_id": "-", "component_name": "-", "component_instance_idx": 0, "correlation_id": "1975a02e-e802-11e7-8971-28b2bd90b19a", "remote_user": "user_a", "request": "/index.html", "referer": "-", "x_forwarded_for": "-", "protocol": "HTTP/1.1", "method": "GET", "remote_ip": "127.0.0.1", "request_size_b": 1234, "remote_host": "127.0.0.1", "remote_port": 50160, "request_received_at": "2017-12-23T16:55:37.280Z", "response_time_ms": 0, "response_status": 200, "response_size_b": "122", "response_content_type": "text/html; charset=utf-8", "response_sent_at": "2017-12-23T16:55:37.280Z" }

Но у этой библиотеки обнаружились серьёзные недостатки:

  • Название полей в логе изменить нельзя без наследования Formatter и кастомизацию.

  • Нельзя никаким образом поставить в логирование тело ответа без написания собственного Middleware для FastAPI, который перехватит еще и тело ответа, что мы добавили в заголовки.

  • Логгирование перехватывало синхронный поток вывода логгера uvicorn.access.

Я не буду писать, как я пытался закостылить исправить первые два минуса, пока не столкнулся с тем, что мне всё-таки нужен асинхронный логер. Я лучше Вам расскажу, как я решил сделать собственное решение.
Для начала я решил отказаться от этой библиотеки сразу:

pip uninstall json-logging

Далее я хотел обозначить структуру журнала. Мне очень помог Pydantic для элегантного описания данного вопроса:

from typing import Union  from pydantic import BaseModel, Field   class BaseJsonLogSchema(BaseModel):     """     Схема основного тела лога в формате JSON     """     thread: Union[int, str]     level: int     level_name: str     message: str     source: str     timestamp: str = Field(..., alias='@timestamp')     app_name: str     app_version: str     app_env: str     duration: int     exceptions: Union[list[str], str] = None     trace_id: str = None     span_id: str = None     parent_id: str = None      class Config:         allow_population_by_field_name = True   class RequestJsonLogSchema(BaseModel):     """     Схема части запросов-ответов лога в формате JSON     """     request_uri: str     request_referer: str     request_protocol: str     request_method: str     request_path: str     request_host: str     request_size: int     request_content_type: str     request_headers: str     request_body: str     request_direction: str     remote_ip: str     remote_port: str     response_status_code: int     response_size: int     response_headers: str     response_body: str     duration: int

После того, как была сформирована структура тела запроса необходимо определить сам механизм форматирования. Формирование, что логично, производится с помощью создания класса JSONLogFormatter с наследованием от стандартного logging.Formatter.
Код test_project.utils.json_logger.py

import datetime import json import logging import traceback  from test_project.main.settings import APP_NAME, \     APP_VERSION, ENVIRONMENT from test_project.utils.utils_schemas import BaseJsonLogSchema   class JSONLogFormatter(logging.Formatter):     """     Кастомизированный класс-форматер для логов в формате json     """      def format(self, record: logging.LogRecord, *args, **kwargs) -> str:         """         Преобразование объект журнала в json          :param record: объект журнала         :return: строка журнала в JSON формате         """         log_object: dict = self._format_log_object(record)         return json.dumps(log_object, ensure_ascii=False)      @staticmethod     def _format_log_object(record: logging.LogRecord) -> dict:         """         Перевод записи объекта журнала         в json формат с необходимым перечнем полей          :param record: объект журнала         :return: Словарь с объектами журнала         """         now = datetime \             .datetime \             .fromtimestamp(record.created) \             .astimezone() \             .replace(microsecond=0) \             .isoformat()         message = record.getMessage()         duration = record.duration \             if hasattr(record, 'duration') \             else record.msecs         # Инициализация тела журнала         json_log_fields = BaseJsonLogSchema(             thread=record.process,             timestamp=now,             level=record.levelno,             level_name=LEVEL_TO_NAME[record.levelno],             message=message,             source=record.name,             duration=duration,             app_name=APP_NAME,             app_version=APP_VERSION,             app_env=ENVIRONMENT,         )          if hasattr(record, 'props'):             json_log_fields.props = record.props          if record.exc_info:             json_log_fields.exceptions = \                 traceback.format_exception(*record.exc_info)          elif record.exc_text:             json_log_fields.exceptions = record.exc_text         # Преобразование Pydantic объекта в словарь         json_log_object = json_log_fields.dict(             exclude_unset=True,             by_alias=True,         )         # Соединение дополнительных полей логирования         if hasattr(record, 'request_json_fields'):             json_log_object.update(record.request_json_fields)          return json_log_object

Нужно заставить FastApi ловить request и response, желательно с наличием body. Я сначала пытался обратиться к dependency подходу FastApi. Но понял, что не хочу описывать в каждом методе метод логирования. Естественно, к нам на помощь приходит Middleware.
Я написал класс, который позволяет внедрить в app-объект журналирование тела запроса и ответа, а также их полей: status code, headers, remote ip, protocol, etc. Класс я назвал LoggingMiddleware и все выполнение происходит методу async def __call__()

Код test_project.utils.middlewares.py

from test_project.main.settings import PORT from test_project.utils.json_logger import EMPTY_VALUE from test_project.utils.utils_schemas import RequestJsonLogSchema   class LoggingMiddleware:     """     Middleware для обработки запросов и ответов с целью журналирования     """     @staticmethod     async def get_protocol(request: Request) -> str:         protocol = str(request.scope.get('type', ''))         http_version = str(request.scope.get('http_version', ''))         if protocol.lower() == 'http' and http_version:             return f'{protocol.upper()}/{http_version}'         return EMPTY_VALUE      @staticmethod     async def set_body(request: Request, body: bytes) -> None:         async def receive() -> Message:             return {'type': 'http.request', 'body': body}          request._receive = receive      async def get_body(self, request: Request) -> bytes:         body = await request.body()         await self.set_body(request, body)         return body      async def __call__(             self, request: Request, call_next: RequestResponseEndpoint,             *args, **kwargs     ):         start_time = time.time()         exception_object = None         # Request Side         try:             raw_request_body = await request.body()             # Последующие действия нужны,              # чтобы не перезатереть тело запроса # и не уйти в зависание event-loop'a             # при последующем получении тела ответа             await self.set_body(request, raw_request_body)             raw_request_body = await self.get_body(request)             request_body = raw_request_body.decode()         except Exception:             request_body = EMPTY_VALUE          server: tuple = request.get('server', ('localhost', PORT))         request_headers: dict = dict(request.headers.items())         # Response Side         try:             response = await call_next(request)         except Exception as ex:             response_body = bytes(                 http.HTTPStatus.INTERNAL_SERVER_ERROR.phrase.encode()             )             response = Response(                 content=response_body,                 status_code=http.HTTPStatus.INTERNAL_SERVER_ERROR.real,             )             exception_object = ex             response_headers = {}         else:             response_headers = dict(response.headers.items())             response_body = b''             async for chunk in response.body_iterator:                 response_body += chunk             response = Response(                 content=response_body,                 status_code=response.status_code,                 headers=dict(response.headers),                 media_type=response.media_type             )         duration: int = math.ceil((time.time() - start_time) * 1000) # Инициализация и формирования полей для запроса-ответа         request_json_fields = RequestJsonLogSchema(             request_uri=str(request.url),             request_referer=request_headers.get('referer', EMPTY_VALUE),             request_protocol=await self.get_protocol(request),             request_method=request.method,             request_path=request.url.path,             request_host=f'{server[0]}:{server[1]}',             request_size=int(request_headers.get('content-length', 0)),             request_content_type=request_headers.get(                 'content-type', EMPTY_VALUE),             request_headers=json.dumps(request_headers),             request_body=request_body,             request_direction='in',             remote_ip=request.client[0],             remote_port=request.client[1],             response_status_code=response.status_code,             response_size=int(response_headers.get('content-length', 0)),             response_headers=json.dumps(response_headers),             response_body=response_body.decode(),             duration=duration         ).dict()         # Хочется на каждый запрос читать          # и понимать в сообщении самое главное,          # поэтому message мы сразу делаем типовым         message = f'{"Ошибка" if exception_object else "Ответ"} ' \                   f'с кодом {response.status_code} ' \                   f'на запрос {request.method} \"{str(request.url)}\", ' \                   f'за {duration} мс'         logger.info(             message,             extra={                 'request_json_fields': request_json_fields,                 'to_mask': True,             },             exc_info=exception_object,         )         return response

У меня есть одержимость по поводу асинхронного логирования в FastApi. Методы вывода потока в консоль, запись в файл, logstash — это тот же I/O процесс, как и сгонять по сетке в БД или в другой сервис, согласны? А так как это I/O процесс, то в асинхронном FastApi очень важно не замедлять работу такой простой штукой как логирование. Я нашел очень хорошую библиотеку, которая за одну конфигурацию перестраивает вывод в консоль в асинхронный режим (также можно провернуть и с файлами и отправкой в logstash, и другие ресурсы). Данная библиотека называется: asynclog. Настройка и установка производится очень просто. Не нужно иметь никаких классов и переопределять handler-ы: Нужно просто настроить логирование через словарь в конфигурации проекта и задать handler.

Создаем конфигурацию логера и использовать его везде. Благо, в python есть для этого конфигурация, которую можно воплотить в настройках проекта.

LOG_CONFIG = {     'version': 1,     'disable_existing_loggers': False,     'formatters': {         'json': {             '()': 'test_project.utils.json_logger.JSONLogFormatter',         },     },     'handlers': { # Используем AsyncLogDispatcher для асинхронного вывода потока.          'json': {             'formatter': 'json',             'class': 'asynclog.AsyncLogDispatcher',             'func': 'test_project.utils.json_logger.write_log',         },     },     'loggers': {         'test_project': {             'handlers': ['json'],             'level': 'DEBUG' if DEBUG else 'INFO',             'propagate': False,         },         'uvicorn': {             'handlers': ['json'],             'level': 'INFO',             'propagate': False,         },       # Не даем стандартному логгеру fastapi работать по пустякам и замедлять работу сервиса         'uvicorn.access': {             'handlers': ['json'],             'level': 'ERROR',             'propagate': False,         },     }, }

Здесь у handler есть особенность в поле func, тут должна быть указана функция вывода потока. Это может быть stdout, print, write file. Что угодно, вплоть до передачи файла по сети в logstash. В нашем случае я решил просто вывести в консоль простейшим способом для примера.
Код test_project.utils.json_logger.py:

def write_log(msg):     print(msg)

Настраиваем приложение на работу с данным middleware, инициализируем logging и его конфигурацию. Далее производим написание обычного route с логгированием сообщения ‘Test log’.

import logging from logging.config import dictConfig  from fastapi import FastAPI from test_project.utils.middlewares import LoggingMiddleware dictConfig(LOG_CONFIG) logger = logging.getLogger(name) app = FastAPI() app.middleware('http')(     LoggingMiddleware() ) router = APIRouter(     tags=['root'],     responses={404: {'description': 'Not found'}}, ) @router.get('/') async def root(request: Request):     logger.info('Test log')     return {'foo': 'bar'} app.include_router(router)

Получаем сразу два сообщения в консоли:
Первое: как раз таки тот вывод из Middleware, который гарантирует отображение полей запроса

{     "thread": 60535,     "level": 20,     "level_name": "Information",     "message": "Ответ с кодом 200 на запрос GET \"http://localhost:8080/\", за 4 мс",     "source": "test_project.utils.middlewares",     "@timestamp": "2021-08-28T01:49:23+03:00",     "app_name": "test_project",     "app_version": "1.0.0",     "app_env": "LOCAL",     "duration": 4,     "request_uri": "http://localhost:8080/",     "request_referer": "",     "request_protocol": "HTTP/1.1",     "request_method": "GET",     "request_path": "/",     "request_host": "127.0.0.1:8080",     "request_size": 0,     "request_content_type": "",     "request_headers": "{\"host\": \"localhost:8080\"}",     "request_body": "",     "request_direction": "in",     "remote_ip": "127.0.0.1",     "remote_port": "50259",     "response_status_code": 200,     "response_size": 13,     "response_headers": "{\"content-length\": \"13\", \"content-type\": \"application/json\"}",     "response_body": "{\"foo\":\"baz\"}" }

Второе: Наше сообщение 'Test Log' вне контекста HTTP запроса, а уже в контексте работы внутри route

{     "thread": 60535,     "level": 20,     "level_name": "Information",     "message": "Test log",     "source": "test_project.main.routes",     "@timestamp": "2021-08-28T01:49:23+03:00",     "app_name": "test_project",     "app_version": "1.0.0",     "app_env": "LOCAL",     "duration": 385,     "trace_id": "3bd6f269dbbbd275d46320c7e240bfe5",     "span_id": "2417303bd1af1ee6",     "parent_id": null }

Бонус. Демонстрация производительности асинхронного вывода.

Почему я так сильно хотел асинхронный вывод журнала? Сейчас будет представлено нагрузочное тестирование на обычном примере одного и того же запроса, с одними и теми же конфигурациями сервера.

  • Количество воркеров: 10

  • Количество одновременно-подключенных клиентов: 200

  • Таймаут: 15 секунд.

  • Время выполнения теста: 15 секунд (больше и не надо)

  • Логгирование одного и того же сообщения: test.

  • Formatter не менял свой код.

Команда запуска нагрузочного тестирования:

wrk -c200 -t1 -d15s http://localhost:8080/users/public/ --timeout 15

Результат нагрузочного тестирования синхронного логгирования:

Результат нагрузочного тестирования асинхронного логгирования:

Как можно заметить, что синхронный вариант вывода дал 2294 rps, а асинхронный вариант вывода выдал целых 8364 rps. Получается, разница производительности целого сложного веб-сервиса увеличилась в 3.64 раза благодаря простому переходу и подходу к логгированию.

Вывод

Никогда не стоит соглашаться на сторонние библиотеки, если чувствуете, что они не могут удовлетворить ваших потребностей. Наследоваться от сторонних библиотек может обернуться плохой шуткой и лучше сразу не полениться и попробовать написать что-то своё. Да, это решение вряд-ли подойдет 99.99% пользователям, есть моменты, которые бы улучшил, оптимизировал и сделал бы более гибкими. Как раз про это я бы хотел почитать в комментариях, обсудить, как можно улучшить данный подход.

Пока что данная реализация находится в закрытой разработке одного из моих проектов, но я обещаю, что дополню эту статью ссылкой на гитхаб, либо подготовлю уже PyPi решение для вас, дорогие читатели данной статьи.

Telegram: @Lev_key


ссылка на оригинал статьи https://habr.com/ru/articles/575454/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *