Освоение gRPC на Python: Продвинутые техники. Часть III

от автора

После рассмотрения прошлых тем gRPC на Python:

Перейдем к третьей (завершающей) части, в которой разберем следующие разделы:

  1. Interceptor;

  2. Трассировка с использованием Jaeger и OpenTelemetry;

  3. Reflection;

  4. Потоковая передача данных;

  5. Health Checking и цепочка вызовов (Deadlines).

Перед тем, как приступить к изучению новых технологий, ознакомимся с архитектурой микросервиса, реализованного в нашем демонстрационном проекте.

Архитектура демонстрационного проекта

Архитектура демонстрационного микросервиса

Архитектура демонстрационного микросервиса
  1. Postman — клиент-приложение для удобной демонстрации работы streaming запросов и ответов, а также для показа выполнения функционала reflection на стороне сервера.

  2. Browser — клиент-приложение, используем любой браузер.

  3. API gateway FastAPI — сервер, предназначенный для запуска gPRC сервисов и также для удобной демонстрации работы.

  4. Interceptor — перехватчики в микросервисах gRPC, реализующие логику валидации запроса (аутентификацию).

  5. Order — микросервис gRPC, реализующий CRUD операции заказов.

  6. Check — микросервис gRPC, реализующий проверку статуса заказа. Служит для показа выполнения цепочки вызовов в микросервисной архитектуре.

  7. Echo — микросервис gRPC, являющийся эхо-сервером. Служит для показа работы потоковой передачи данных.

  8. Health — микросервис gRPC, реализующий проверку работоспособности сервиса.

  9. Jaeger — платформа для трассировки, используемая для мониторинга микросервисной архитектуры.

  10. Tracing(OpenTelemetry) — набор стандартов и инструментов для сбора данных о производительности и работы микросервисов.

  11. DB — база данных (SQLite), используемая для хранения информации о заказах.

Interceptor

Interceptor — это компонент, который позволяет перехватывать и изменять запросы и ответы на стороне клиента или сервера до их обработки или отправки. Interceptor позволяет добавлять дополнительную функциональность, такую как логирование, аутентификацию, авторизацию, кэширование и другие функции, не изменяя основной код приложения. В нашем случае, мы будем использовать данную технологию, в качестве аутентификации.

Для демонстрации аутентификации с помощью Interceptor, была добавлена конечная точка get_token FastApi для получения jwt-токена с временем жизни 1 день, с помощью которого будем получать доступ к сервисам gRPC.

@router.get("/get_token") async def get_token() -> JSONResponse:     payload = {         "username": "0xN1ck",         "exp": datetime.utcnow() + timedelta(days=1)     }     token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')     return JSONResponse({'rpc-auth': f'{token}'}, status_code=status.HTTP_200_OK) 

Итак, как же нам реализовать саму логику Interceptor? Для этого мы выполним следующие шаги:

  1. Первым этапом создадим файл interceptor.py, в котором реализуем два класса AuthInterceptor — для сервера и KeyAuthClientInterceptor — для клиента.

from functools import partial  import grpc import jwt from grpc.aio import ClientCallDetails   class AuthInterceptor(grpc.aio.ServerInterceptor):     def __init__(self, key):         # При инициализации создаем атрибут _valid_metadata,         # который будет хранить секретный ключ для валидации токена пользователя         self._valid_metadata = key      @staticmethod     async def deny(_, context, details):         # Функция, предназначенная для отправки сообщений пользователю при отработках ошибок в функции intercept_service         await context.abort(grpc.StatusCode.UNAUTHENTICATED, details)      async def intercept_service(self, continuation, handler_call_details):         # Получаем кортеж, содержащий метаданные         metadatums = handler_call_details.invocation_metadata         try:             # Получаем токен из метаданных             resault = next(filter(lambda x: x.key == 'rpc-auth', metadatums))             if jwt.decode(resault.value, self._valid_metadata, algorithms=['HS256']):                 return await continuation(handler_call_details)         except StopIteration:             return grpc.unary_unary_rpc_method_handler(partial(self.deny, details="Токен не найден"))         except jwt.ExpiredSignatureError:             return grpc.unary_unary_rpc_method_handler(partial(self.deny, details="Время жизни токена истекло"))         except jwt.InvalidTokenError:             return grpc.unary_unary_rpc_method_handler(partial(self.deny, details="Токен не валиден"))   class KeyAuthClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor):     def __init__(self, user_token):         # Получаем токен пользователя         self.user_token: str = user_token      async def intercept_unary_unary(self, continuation, client_call_details, request):         # Добавляем токен в метаданные с ключом rpc-auth и отправляем запрос на сервер         metadata = []         if client_call_details.metadata is not None:             metadata = list(client_call_details.metadata)         metadata.append(("rpc-auth", self.user_token))         new_details = ClientCallDetails(             client_call_details.method,             client_call_details.timeout,             metadata,             client_call_details.credentials,             client_call_details.wait_for_ready,         )         response = await continuation(new_details, request)         return response 

В классе KeyAuthClientInterceptor добавляем токен в метаданные с ключом rpc-auth и отправляем запрос на сервер.
В классе AuthInterceptor добавляем проверку на валидность токена. Если токен валиден, то пропускаем запрос далее, в противном случае возвращаем ошибку.

Теперь нам нужно применить данные классы в наших клиентах и сервере gPRC.

Для сервиса Order реализуем клиента следующим образом в файле clients/order.py:

import grpc from fastapi import Request from grpc_core.protos.order import order_pb2_grpc from grpc_core.servers.interceptors import KeyAuthClientInterceptor from settings import settings   async def grpc_order_client(request: Request):     # Так как мы используем FastApi для демонстрации, прокинем токен в заголовок запроса     auth = request.headers.get("rpc-auth")     # При создании канала используем параметр interceptors для добавления нашего перехватчика, в который передаем токен     channel = grpc.aio.insecure_channel(         f'{settings.GRPC_HOST_LOCAL}:{settings.GRPC_PORT}',         interceptors=[             KeyAuthClientInterceptor(auth),         ],     )     client = order_pb2_grpc.OrderServiceStub(channel)     return client 

Таким образом мы создаем клиента для сервиса Order, в который передаем наш перехватчик с полученным токеном для валидации. Для сервиса Check реализация клиента будет аналогичная.

И теперь регистрируем перехватчики на сервере похожим способом, как и на клиенте в файле servers/manager.py:

self.server = aio.server(     ThreadPoolExecutor(max_workers=10),     interceptors=[         AuthInterceptor(settings.SECRET_KEY),     ] ) 

Используем секретный ключ в классе AuthInterceptor для валидации токена.

P.S. В классе KeyAuthClientInterceptor, унаследованного от grpc.aio.UnaryUnaryClientInterceptor, мы создали interceptor для клиента, переопределив метод intercept_unary_unary, который служит для унарных (UnaryUnary) вызовов. Хотя в нашей архитектуре проекта видно, что мы будем рассматривать не только унарные вызовы, но и UnaryStream, StreamUnary, StreamStream в сервисе Echo, мы не определяли необходимые перехватчики для клиента сервиса Echo, потому что будем использовать Postman для демонстрации, который реализует запросы к сервису собственными методами «из-под капота». Создание данных перехватчиков можете рассмотреть самостоятельно. Логика будет несильно отличаться.

Результат проделанной работы:

Невалидный токен

Невалидный токен

Получаем в случае не валидного токена ошибку. Наблюдаем, что перехватчик исправно отработал.

Валидный токен

Валидный токен

В данном запросе видим, что с данным токеном запрос прошел успешно.

Трассировка с использованием Jaeger и OpenTelemetry

При разработке микросервисов часто возникают проблемы с пониманием, что происходит внутри приложения и как запросы проходят через все эти сервисы. И для мониторинга таких процессов нам может помочь трассировка. Она может отслеживать, как запросы проходят через разные части системы, и видеть, сколько времени занимает каждая операция. Это важно, потому что в микросервисной архитектуре запрос может проходить через десятки сервисов, и любая задержка или ошибка в одном из них может повлиять на работу всего приложения.
Jaeger и OpenTelemetry — это два инструмента, которые помогают с трассировкой. Jaeger помогает следить за запросами в распределённых системах. OpenTelemetry — это открытый стандарт, который объединяет практики для сбора трассировок, метрик и логов.
Приступим к процессу применения данных технологий на практике.

Первым делом, при установке необходимых библиотек в демонстрационном проекте, пришлось понизить версию gRPC из-за конфликта зависимостей protobuf. Плагин в OpenTelemetry from opentelemetry.exporter.jaeger.proto.grpc import JaegerExporter требовал protobuf версии 3.20.0, а последняя версия gRPC использовала protobuf пятой версии. В файле pyproject.toml настроены все зависимости.

Следующим шагом необходимо развернуть сам сервис Jaeger. Для этого используем Docker Compose со следующим конфигурационным файлом.

version: '3.9'  services:   jaeger:     image: jaegertracing/all-in-one:${JAEGER_VERSION:-latest}     ports:       - "16686:16686"       - "4318:4318"       - "14250:14250"     environment:       - LOG_LEVEL=debug     networks:       - jaeger-example  networks:   jaeger-example: 

Теперь можно приступить к написанию трассировочного кода. Для этого мы при запуске сервера gRPC (servers/manager.py) реализуем настройку трассировки.

# Создаем экспортёр для отправки трассировочных данных в Jaeger. # Указываем адрес коллектора и разрешаем небезопасное соединение (без шифрования). jaeger_exporter = JaegerExporter(     collector_endpoint=f'{settings.JAEGER_HOST}:{settings.JAEGER_PORT}',     insecure=True )  # Создаем процессор для пакетной обработки трассировочных данных (спанов). # Он будет собирать спаны и отправлять их в Jaeger с использованием созданного экспортёра. span_processor = BatchSpanProcessor(jaeger_exporter)  # Устанавливаем глобальный провайдер трассировки. # Указываем, что ресурс трассировки будет иметь имя "Order". # Это имя будет использоваться для идентификации службы в системе трассировки. trace.set_tracer_provider(     TracerProvider(resource=Resource.create({SERVICE_NAME: "Order"})) )  # Добавляем созданный процессор спанов в провайдер трассировки. # Это необходимо для того, чтобы спаны обрабатывались и отправлялись в Jaeger. trace.get_tracer_provider().add_span_processor(span_processor)  # Создаем инструмент для автоматической трассировки gRPC сервера. grpc_server_instrumentor = GrpcAioInstrumentorServer()  # Включаем автоматическую трассировку. # Это позволит автоматически отслеживать все вызовы gRPC на сервере. grpc_server_instrumentor.instrument()  # Такая же настройка клиента, как и для сервера grpc_client_instrumentor = GrpcAioInstrumentorClient() grpc_client_instrumentor.instrument() 

Спан представляет собой единицу выполнения операции. Он отслеживает конкретные события, связанные с запросом, создавая картину того, что произошло во время выполнения этой операции.

Далее рассмотрим использование трассировочного кода в одном из наших сервисов. Возьмем для примера OrderService и реализуем трассировку в методе ListOrders.

async def ListOrders(self, request, context) -> order_pb2.ListOrdersResponse:     # Получаем трассировщик для текущего модуля     tracer = trace.get_tracer(__name__)     # Создаем новый span для текущего метода ListOrders     with tracer.start_as_current_span("ListOrders") as span:         try:             logger.info(f'Получен запрос на получение списка заказов')             result = await OrderHandler.list_orders()             response = self.message.dict_to_rpc(                 data=result.dict(),                 request_message=order_pb2.ListOrdersResponse(),             )              # Устанавливаем атрибут статус-кода RPC в span             span.set_attribute("rpc.grpc.status_code", "OK")             # Добавляем событие успешного ответа в span             span.add_event("Successful response", {"response": str(response)})             return response          except Exception as e:             # Устанавливаем статус span как ошибочный и добавляем сообщение об ошибке             span.set_status(Status(StatusCode.ERROR, str(e)))             # Добавляем событие ошибки в span с деталями об ошибке             span.add_event("Error response", {"error": str(e)}) 

Проверим, как будет отрабатывать наша трассировка.

Jaeger

Jaeger
Request info

Request info

Мы видим, что после отправки нескольких запросов, трассировка успешно произвела записи в Jaeger.

Reflection

Reflection — это механизм, который позволяет клиентам динамически исследовать возможности сервиса gRPC. Он предоставляет информацию о доступных методах и структурах данных сервиса, что упрощает разработку, отладку и тестирование, особенно в случаях, когда отсутствуют заранее подготовленные протобаф-файлы (protobuf). В нашем случае это упростит работу в приложение Postman.

На практике настройка Reflection проста и настраивается в инициализации сервера gRPC (servers/manager.py).

# Определение кортежа SERVICE_NAMES, содержащего полные имена сервисов, зарегистрированных на сервере. SERVICE_NAMES = (     # Получение полных имени сервисов (OrderService, ...) из дескрипторов (order_pb2, ...).     order_pb2.DESCRIPTOR.services_by_name["OrderService"].full_name,     echo_pb2.DESCRIPTOR.services_by_name["EchoService"].full_name,     health_pb2.DESCRIPTOR.services_by_name["Health"].full_name,     check_pb2.DESCRIPTOR.services_by_name["CheckStatusOrderService"].full_name,     # Добавление стандартного имени сервиса reflection (reflection service).     reflection.SERVICE_NAME, ) # Включение reflection для перечисленных в SERVICE_NAMES сервисов. reflection.enable_server_reflection(SERVICE_NAMES, self.server) 

Так как на нашем сервере настроен Interceptor, то в Postman также потребуется добавить токен в метаданные.

Reflection token

Reflection token
Reflection success

Reflection success

По скриншотам видно, что Postman увидел наши сервисы без применения protobuf-файлов.

Потоковая передача данных

gRPC поддерживает четыре режима взаимодействия между сервером и клиентом:

  1. Unary gRPC (Однонаправленное gRPC): клиент отправляет запрос и ожидает ответа от сервера.

  2. Server Streaming gRPC (Потоковая передача от сервера): сервер отвечает на запрос клиента потоком сообщений, завершая передачу сообщением о состоянии.

  3. Client Streaming gRPC (Потоковая передача от клиента): клиент отправляет поток сообщений серверу, который в ответ посылает одно подтверждающее сообщение.

  4. Bi Directional Streaming gRPC (Двунаправленная потоковая передача): клиент и сервер обмениваются потоками сообщений одновременно, при этом каждый поток передается независимо в обоих направлениях.

Streaming

Streaming

Так как до этого мы уже использовали обычные Unary gRPC (Однонаправленное gRPC) запросы, то пропустим данный пункт и рассмотрим остальные три режима взаимодействия в Echo сервисе.

Первым делом, когда начинаем писать сервис gRPC, наша работа начинается с protobuf-файла. Для эхо-сервера он будет выглядеть следующим образом:

syntax = "proto3";  package echo;  message EchoMessage { string username = 1; string message = 2; }  message DelayedReply { repeated EchoMessage response = 1; }  service EchoService { // Client Streaming rpc ClientStream (stream EchoMessage) returns (DelayedReply);  // Server Streaming rpc ServerStream (EchoMessage) returns (stream EchoMessage);  // Both Streaming rpc BothStream (stream EchoMessage) returns (stream EchoMessage); } 

После генерации python-файлов c помощью grpc-tools, как было рассмотрено в предыдущей нашей статье приступим к написанию Echo сервиса (servers/services/echo.py).

import asyncio from loguru import logger  from grpc_core.protos.echo import echo_pb2 from grpc_core.protos.echo import echo_pb2_grpc  from grpc_core.servers.utils import GrpcParseMessage   class EchoService(echo_pb2_grpc.EchoServiceServicer):     def __init__(self):         self.message = GrpcParseMessage()      # Асинхронный метод для обработки клиентского стрима.     async def ClientStream(self, request_iterator, context) -> echo_pb2.DelayedReply:         # Создание ответа с отложенным ответом.         response = echo_pb2.DelayedReply()         # Асинхронный цикл для обработки каждого запроса из стрима.         async for request in request_iterator:             logger.info(f'Приняли запрос от стрим клиента: {self.message.rpc_to_dict(request)}')             response.response.append(request)         return response      # Асинхронный метод для обработки серверного стрима.     async def ServerStream(self, request, context):         logger.info(f'Приняли запрос от клиента: {self.message.rpc_to_dict(request)}')         # Цикл для отправки нескольких ответов клиенту.         for _ in range(3):             # Отправка текущего запроса обратно в качестве ответа.             yield request             logger.info(f'Ответил стрим сервер: {self.message.rpc_to_dict(request)}')             await asyncio.sleep(1)      # Асинхронный метод для обработки двунаправленного стрима.     async def BothStream(self, request_iterator, context):         # Асинхронный цикл для обработки каждого запроса из стрима.         async for request in request_iterator:             logger.info(f'Приняли запрос от стрима клиента: {self.message.rpc_to_dict(request)}')             # Цикл для отправки нескольких ответов клиенту на каждый запрос.             for i in range(3):                 # Отправка текущего запроса обратно в качестве ответа.                 yield request                 logger.info(f'Ответил стрим сервер: {self.message.rpc_to_dict(request)}')                 await asyncio.sleep(1) 

Как видим, для эхо-сервера настройка streaming-методов ClientStream, ServerStream и BothStream была совсем не сложной. Давайте посмотрим, как это все будет выглядеть в приложении Postman на примере BothStream.

BothStream

Streaming both

Streaming both

Health Checking и цепочка вызовов (Deadlines).

Health Checking

Health Checking — это механизм, используемый для мониторинга и контроля состояния сервисов, чтобы убедиться, что они работают должным образом и могут обрабатывать запросы. Данная технология осуществляется с помощью функций Check и Watch и возвращает статусы: UNKNOWN, SERVING и NOT_SERVING*. Для проверки состояния используется функция Check с помощью отправки Unary запроса и для мониторинга — функция Watch с помощью ServerStream запроса.

Для рассмотрения Health Checking на практике реализуем сервис Health (servers/services/health.py). Данный подход можно использовать и для других сервисов, которые выполняют свою логику, переопределив методы Check и Watch. Также
можно ознакомится с примером, который указан на официальном сайте gRPC Health Checking example.

import asyncio  from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc   class HealthService(health_pb2_grpc.HealthServicer):     # Асинхронный метод для проверки состояния.     async def Check(self, request, context):         # Возвращаем объект HealthCheckResponse со статусом SERVING, указывая, что сервис работает нормально         return health_pb2.HealthCheckResponse(             status=health_pb2.HealthCheckResponse.SERVING         )      # Асинхронный метод для подписки на обновления (мониторинга) состояния.     async def Watch(self, request, context):         while True:             current_status = health_pb2.HealthCheckResponse.SERVING             response = health_pb2.HealthCheckResponse(status=current_status)             yield response             await asyncio.sleep(1) 

Цепочка вызовов (Deadlines).

Для рассмотрения данного вопроса был создан сервис Check, который принимает запрос от Order. Он выполняет логику проверки состояния заказа и возвращает статус его выполнения, и при получении ответа сервис Order обновляет статус заказа в БД. По итогу у нас получается следующая цепочка вызовов, в которой рассмотрим технологию Deadlines.

Chain of calls

Chain of calls
# (api/oreder.py) @router.post("/check") async def check_order_status(         uuid: str,         client: t.Any = Depends(grpc_order_client),         key: str = Security(api_key_header), ) -> JSONResponse:     try:         # устанавливаем тайм-аут в 2 секунды, определяя за сколько времени должна будет отработать вся цепочка вызовов         order = await client.CheckStatusOrder(check_pb2.CheckStatusOrderRequest(uuid=uuid), timeout=2)     except AioRpcError as e:         raise HTTPException(status_code=404, detail=e.details())      return JSONResponse(MessageToDict(order)) 

Далее на сервисе Order поставим ожидание в 1 секунду и выполним логгирование, чтобы убедиться, сколько времени еще осталось, перед отправлением запроса на сервис Check.

await asyncio.sleep(1) logger.info(f'Осталось времени в цепочке вызовов: {context.time_remaining()}') # 0.98  client = await grpc_check_client(auth=auth) response = await client.CheckStatusOrder(     check_pb2.CheckStatusOrderRequest(uuid=request.uuid),     # передаем в тайм-аут, оставшееся время на выполнение     timeout=context.time_remaining() )  await OrderHandler.update_after_check_order(response) 

Тут наблюдаем, что осталось всего 0.98 секунд. Если сервис Check не успеет за это время, то отработает исключение, и можем получить следующий результат.

Deadline

Deadline

В случае, если мы поставим ожидание более 2 секунд на сервисе Order, то и запрос не пройдет далее и цепочка вызовов будет завершена.

Из всего этого можно выделить следующие преимущества Deadlines:

  • Управление временем выполнения запросов. Установка сроков позволяет ограничить время, в течение которого запрос может выполняться. Это предотвращает зависание клиента в ожидании ответа и позволяет более эффективно использовать ресурсы.

  • Повышение надежности системы. Сроки помогают предотвратить ситуации, когда один медленный или зависший запрос блокирует выполнение других запросов. Это особенно важно в высоконагруженных системах.

Заключение

Мы завершили серию статей, посвящённых изучению gRPC на Python, охватив широкий спектр тем и аспектов, которые пригодятся при проектировании gRPC-сервисов. В третьей заключительной части мы подробно рассмотрели:

  1. Interceptor: Разобрались, как реализовать перехватчики для аутентификации запросов с использованием JWT-токенов. Технология позволяет добавлять дополнительную функциональность без изменения основного кода приложения.

  2. Трассировка с использованием Jaeger и OpenTelemetry: Настроили и применили инструменты для трассировки, что позволяет отслеживать прохождение запросов через микросервисы и выявлять возможные задержки или ошибки.

  3. Reflection: Реализовали механизм Reflection, который позволяет клиентам динамически исследовать возможности сервиса gRPC. Это упростило работу с Postman без необходимости заранее подготовленных protobuf-файлов.

  4. Потоковая передача данных: Рассмотрели и реализовали различные типы взаимодействия (Client Streaming, Server Streaming, Bi-Directional Streaming) в gRPC на примере Echo-сервиса. Это позволило продемонстрировать обмен данными в реальном времени между клиентом и сервером.

  5. Health Checking и цепочка вызовов (Deadlines): Рассмотрели механизм Health Checking для мониторинга состояния сервисов, а также реализовали цепочку вызовов между микросервисами, применив использование Deadlines для контроля времени выполнения запросов.

В данной серии статей были охвачены ключевые аспекты и предоставлены практические примеры, которые помогут Вам в ваших проектах. Теперь вы знаете, как создавать и интегрировать различные компоненты gRPC, что позволит строить эффективные и надёжные микросервисные архитектуры.

P.S. Для ознакомления с лабораторными проектом можно перейти по ссылке grpc_example


ссылка на оригинал статьи https://habr.com/ru/articles/830210/