
Системы оркестрации контейнеров существенно упростили управление многокомпонентными системами, в том числе основанными на микросервисной архитектуре. Но остался открытым вопрос организации надежного обмена сообщениями между микросервисами, координации последовательности операций при распределенной архитектуре. В этой статье мы рассмотрим подход Incubating (CNCF)-проекта Dapr (Distributed Application Runtime) по использованию Sidecar-контейнеров в Kubernetes для реализации микросервисной архитектуры, основанной на событиях.
Dapr предлагает для использования механизм обнаружения и взаимодействия между микросервисами (через вызовы методов и обмен сообщениями в модели pub-sub), но также позволяет управлять секретами, отслеживать действия при распределенных вычислениях, реализовать механизмы семафоров для блокировки общих ресурсов, выполнять связывание входных и выходных данных и событий с внешними хранилищами, а также сохранять и восстанавливать после сбоев состояние приложения и доставлять до распределенных приложений общую конфигурацию. Dapr использует концепцию акторов (actors) для описания логики и публикует HTTP и gRPC интерфейсы для взаимодействия с библиотекой. При этом Dapr создает только обобщенные интерфейсы, а реальное хранение секретов и состояния обеспечивается другими системами (например, SPIFFE или Postgres). Важной особенностью Dapr является использование Sidecar-контейнеров, которые присоединяются к существующим Pod’ам и выступают в роли посредника во взаимодействии с остальными сервисами. Таким образом каждый микросервис взаимодействует только с известным его Dapr Runtime (без необходимости встраивания его в код приложения), запущенный в том же pod, и никак не зависит от расположения других микросервисов (которые могут быть расположены в том числе в других облачных провайдерах), используемых решений для хранения состояния и секретов и другого. Dapr может работать как самостоятельный оркестратор (через утилиту командной строки dapr), так и совместно с Kubernetes (в этом случае для управления используется Dapr operator). Во втором случае конфигурация определяется через аннотации Deployment / DaemonSet / StatefulSet, а также CRD Component для конфигурирования хранилищ системных сервисов.
Для просмотра текущего состояния (распределенной конфигурации, доступных микросервисов и др.) Dapr устанавливает дополнительный Dashboard с доступом через веб-браузер.
В настоящий момент Dapr поддерживает SDK для следующих языков программирования (но теоретически можно использовать и любой другой, достаточно сделать реализацию клиента для HTTP или gRPC API):
-
C++
-
Go
-
Java / Kotlin (в том числе, с Spring Boot)
-
JavaScript (поддерживается также Express)
-
.NET
-
PHP
-
Python (может использоваться с Flask, FastAPI, gRPC)
-
Rust
Для настройки Dapr установим утилиту командой строки. После установки мы можем выполнить инициализацию Dapr:
dapr init
При установке в standalone-режиме дополнительно запускаются контейнеры dapr_redis для хранения состояния, dapr_zipkin для отслеживания истории и измерений времени при обработке распределенных вызовов, dapr_placement для управления запуском управляющих контейнеров. Для установки Dapr под управлением kubernetes нужно указать дополнительный флаг —kubernetes (и можно изменить пространство имен для запуска оператора —namespace) и —wait для ожидания завершения запуска и настройки оператора и вспомогательных контейнеров:
dapr init --kubernetes --namespace darp_system --wait
Далее запустим dashboard (dapr dashboard) и подключимся к нему по адресу http://localhost:8080. Как можно увидеть, по умолчанию dapr создает хранилище конфигурации и состояния и механизм pub-sub.

Попробуем теперь сделать простое приложение, которое будет сохранять состояние при завершении и восстанавливать его при запуске. Для примера будем создавать код на Python. Прежде всего установим зависимости.
pip install dapr
И создадим код, который будет взаимодействовать с компонентом хранения состояния в Dapr:
from dapr.clients import DaprClient store_name = 'statestore' key = 'invocation_count' with DaprClient() as d: try: state = d.get_state(store_name, key=key) counter = int(state.data) except: counter = 0 counter+=1 print(f'New counter is {counter}') d.save_state(store_name, key=key, value=str(counter))
Теперь запустим приложение с дополнительным управляющим контейнером dapr:
dapr run --app-id myapp -- python3 counter.py
При нескольких последовательных запусках значение счетчика будет сохраняться и увеличиваться на 1 при каждом запуске.
Опубликуем теперь метод для использования другими микросервисами, для этого импортируем класс App из dapr.ext.grpc и аннотируем метод через @app.method с указанием названия метода (можно будет использовать совместно с идентификатором приложения для вызова метода, например через команду dapr invoke). Аннотированный метод будет получать объект типа InvokeMethodRequest (из него можно получить дополнительные данные, добавленные к запросу) и возвращать InvokeMethodResponse с кодированной байтовой последовательностью с ответом (может быть, например, json):
from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse from dapr.clients import DaprClient from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType from dapr.clients.grpc._state import StateItem store_name = 'statestore' key = 'invocation_count' app = App() @app.method('increment_counter') def increment(request: InvokeMethodRequest) -> InvokeMethodResponse: with DaprClient() as d: try: state = d.get_state(store_name, key=key) counter = int(state.data) except: counter = 0 counter+=1 d.save_state(store_name, key=key, value=str(counter)) return InvokeMethodResponse(str(counter).encode(), 'text/plain; charset=UTF-8') app.run(10080)
Созданное приложение опубликуем на произвольный порт и укажем его при запуске сценария через dapr run, а затем подключимся (для теста) через утилиту командной строки:
dapr run --app-id myapp --app-port 10080 --app-protocol grpc -- python3 counter_service.py & dapr invoke --app-id myapp --method increment_counter
При вызове dapr invoke будет отображаться ответ функции, опубликованной в приложении myapp (число увеличивается на 1 при каждом запуске dapr invoke). Для просмотра списка активных приложений можно использовать dapr list:
dapr list APP ID HTTP PORT GRPC PORT APP PORT COMMAND AGE CREATED DAPRD PID CLI PID myapp 50386 50387 10080 python3 counter_s... 9s 2022-12-14 23:52.08 56930 56924
Важно, что dapr invoke можно использовать только при self-hosted установке (через dapr init в Docker, без использования Kubernetes). Поэтому рассмотрим также, как можно выполнить взаимодействие между микросервисами. Для этого нам понадобится уже известный объект класса DaprClient и метод invoke_method, который принимает идентификатор вызываемого приложение, название метода и дополнительные данные (при необходимости).
import json import time from dapr.clients import DaprClient with DaprClient() as d: resp = d.invoke_method( 'myapp', 'increment_counter', ) print(resp.text(), flush=True)
теперь при запуске приложения из этого сценария мы будем видеть увеличивающееся целочисленное значение в логах выполнения приложения (в случае с Kubernetes их можно просмотреть через dapr logs, для self-hosted установки они доступны в выводе команды dapr run).
dapr run --app-id caller -- python3 caller.py ✅ You're up and running! Both Dapr and your app logs will appear here. INFO[0000] placement tables updated, version: 0 app_id=client instance=d-zolotov-osx scope=dapr.runtime.actor.internal.placement type=log ver=1.9.5 == APP == 16 ✅ Exited App successfully
Далее рассмотрим сценарий взаимодействия микросервисов через механизм pub/sub-очереди. В этом варианте использования ответ не предполагается получение ответа для сообщения, но публикующее обработчик приложение должно вернуть одно из значений статуса обработки (success — успешно, drop — сообщение пропущено и не должно быть обработано, retry — произошла ошибка и сообщение должно быть обработано позднее).
from dapr.ext.grpc import App from dapr.clients.grpc._response import TopicEventResponse from cloudevents.sdk.event import v1 import json from dapr.clients import DaprClient store_name = 'statestore' pubsub_name = 'pubsub' key = 'invocation_counter' topic = 'counter' app = App() print('Publisher is initialized') @app.subscribe(pubsub_name=pubsub_name, topic=topic) def increment(event: v1.Event) -> TopicEventResponse: with DaprClient() as d: try: state = d.get_state(store_name, key=key) counter = int(state.data) except: counter = 0 counter+=1 d.save_state(store_name, key=key, value=str(counter)) print(f'New counter value is {counter}') return TopicEventResponse("success") app.run(10080)
Для удобства отладки можно дополнительно указать опцию —enable-api-logging для отображения обращений в API Dapr (например, чтение-запись состояния, подписка на очередь и другие). Также для корректного отображения вывода в консоль для сервиса добавим флаг -u (unbuffered) для python3:
dapr run --app-id myapp --app-port 10080 --app-protocol grpc --enable-api-logging -- python3 -u counter_service.py
Теперь мы можем протестировать отправку сообщения через консоль (для self-hosted решения):
dapr publish --pubsub pubsub --topic counter --publish-app-id="myapp"
или через клиента
import json import time from dapr.clients import DaprClient with DaprClient() as d: d.publish_event( pubsub_name='pubsub', topic_name='counter', data="" )
dapr run --app-id client -- python3 caller.py
Здесь важно отметить различия в сценариях вызова сервиса (ожидается ответ, вызов предполагает доступность вызываемого метода приложения в момент вызова) и отправки сообщения (сообщения будут сохраняться, даже если обработчик сейчас недоступен, и будут отправлены последовательно на обработку при запуске приложения, подписанного на эту очередь). Но если мы проведем простой эксперимент и отправим несколько сообщений при отключенном myapp и затем его запустим, то мы увидим несколько одинаковых сообщений об увеличении счетчика (в связи с тем, что обработчики будут запущены параллельно и начнут конкурировать за общий ресурс). Попробуем исправить эту ситуацию через использование Distributed Lock, для этого необходимо настроить lockstore, для этого создадим файл описания компонента в ~/.dapr/components/lockstore.yaml.
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: statestore spec: type: state.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: "" - name: actorStateStore value: "true"
Также, чтобы исключить состояние гонки при вызове самого метода захвата блокировки, добавим произвольную задержку:
from dapr.ext.grpc import App from dapr.clients.grpc._response import TopicEventResponse from cloudevents.sdk.event import v1 import json import time import random from dapr.clients import DaprClient store_name = 'statestore' lockstore_name = 'lockstore' lock_name = 'counter_lock' pubsub_name = 'pubsub' client_id = 'myapp' expiration = 10# для исключения бесконечной блокировки key = 'invocation_counter' topic = 'counter' app = App() print('Publisher is initialized') @app.subscribe(pubsub_name=pubsub_name, topic=topic) def increment(event: v1.Event) -> TopicEventResponse: with DaprClient() as d: time.sleep(random.randint(1,500)/1000) # исключаем race condition для try_lock with d.try_lock(lockstore_name, lock_name, client_id, expiration): try: state = d.get_state(store_name, key=key) counter = int(state.data) except: counter = 0 counter+=1 d.save_state(store_name, key=key, value=str(counter)) print(f'New counter value is {counter}') return TopicEventResponse("success") app.run(10080)
Теперь множественные сообщения из очереди будут обрабатываться корректно и каждое из событий приведет к увеличению счетчика на 1.
Альтернативным способом удаленного вызова методов является использование акторов. Добавим необходимую зависимость:
pip install dapr-ext-fastapi-dev
(также поддерживаются другие серверы, например Flask).
Код сервера будет представлять собой REST API с регистрацией точки подключения как актора:
from dapr.actor import Actor, ActorInterface, actormethod from dapr.actor.runtime.runtime import ActorRuntime from dapr.actor.runtime.config import ActorRuntimeConfig, ActorTypeConfig from dapr.clients import DaprClient from dapr.ext.fastapi import DaprActor from fastapi import FastAPI store_name = "statestore" key = "invocation_counter" class CounterActorInterface(ActorInterface): @actormethod("Increment") async def increment(self) -> int: pass class CounterActor( Actor, CounterActorInterface ): # дополнительно можно подключить Remindable и реализовать set_reminder, receive_reminder, set_timer, timer_callback def __init__(self, ctx, actor_id): super(CounterActor, self).__init__(ctx, actor_id) async def increment(self): with DaprClient() as d: try: state = d.get_state(store_name, key=key) counter = int(state.data) except: counter = 0 counter += 1 d.save_state(store_name, key=key, value=str(counter)) print(f"New counter value is {counter}") return counter config = ActorRuntimeConfig() ActorRuntime.set_actor_config(config) app = FastAPI(title=f"{CounterActor.__name__}Service") actor = DaprActor(app) @app.on_event("startup") async def fastapi_startup(): await actor.register_actor(CounterActor)
Запустим приложение, публикующее этого актора:
dapr run --app-id myapp --app-port 3000 --enable-api-logging -- uvicorn --port 3000 counter_actor:app
Для проверки создадим простого клиента:
import asyncio from dapr.actor import Actor, ActorProxy, ActorId, ActorInterface, actormethod class CounterActorInterface(ActorInterface): @actormethod("Increment") async def increment(self) -> int: pass async def main(): proxy = ActorProxy.create('CounterActor', ActorId('1'), CounterActorInterface) result = await proxy.invoke_method("Increment") value = result.decode("utf8") print(f'New counter is {value}') asyncio.run(main())
Также нередко для доступа к внешним сервисам или при создании интеграций нужно обеспечить надежное хранение секретов и возможность их получить из микросервисов. В следующем примере мы посмотрим как можно получить доступ к единому хранилищу секретов (в нашем случае на основе простого json-файла, но может использовать любое поддерживаемое vault-решение). Добавим конфигурацию для secretsstore в ~/.dapr/config.yaml:
apiVersion: dapr.io/v1alpha1 kind: Configuration metadata: name: daprConfig spec: tracing: samplingRate: "1" zipkin: endpointAddress: http://localhost:9411/api/v2/spans secrets: scopes: - storeName: "localsecretstore" defaultAccess: "deny" allowedSecrets: ["secretKey",]
добавим конфигурацию компонента:
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: localsecretstore namespace: default spec: type: secretstores.local.file metadata: - name: secretsFile value: secrets.json - name: nestedSeparator value: ":"
и создадим json-файл с конфигурацией ключей (secrets.json):
{ "secretKey": "mysecretvalue", }
теперь доработаем код и получим значение секрета:
secretKey = 'secretKey' secretStoreName = 'localsecretstore' //... print(f'Secret value is {d.get_secret(secretStoreName, secretKey).secret["secretKey"]}')
Мы рассмотрели основные сценарии использования Dapr для организации взаимодействия микросервисов. Во второй части статьи мы поговорим о возможностях наблюдения за выполнением последовательности вызовов (distributed tracing) и об интеграции с внешними источниками данных (binding).
Использование Dapr в Kubernetes принципиально не отличается от self-hosted установки (кроме того, что будут недоступны команды dapr publish и dapr invoke). Конфигурация компонентов (хранилище секретов и состояния, брокер очереди сообщений и др.) определяются через CRD Component (рассмотренные выше примеры будут работать и в Kubernetes). Дополнительно можно использовать следующие команды dapr:
-
components — отображает зарегистрированные компоненты и их конфигурацию
-
configurations — отображает известные конфигурации dapr
-
logs — просмотр логов для присоединенного dapr-контейнера
-
status — информация о состоянии сервисов dapr (включая оператор и инжектор, который используется для присоединения sidecar)
-
upgrade — обновление контейнеров dapr до актуальной версии
При публикации приложения через Deployment / StatefulSet / DaemonSet в kubernetes необходимые атрибуты (app-id, app-port и другие) могут быть определены в аннотациях шаблона (spec.template.annotations: dapr.io/enabled=true для присоединения sidecar к поду, dapr.io/app-id — идентификатор процесса, dapr.io/app-port — порт для публикации).
Примеры кода с конфигурацией можно посмотреть в официальном репозитории (например, для Python), а также найти рассмотренные выше фрагменты кода и сценарии для запуска в github-репозитории.
Материал подготовлен в преддверии старта курса Microservice Architecture от OTUS. Узнать подробнее о курсе и зарегистрироваться на бесплатный урок можно по ссылке ниже.
ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/706186/
Добавить комментарий