Всем привет! Я Станислав Бушуев, Software Engineer в Semrush. Сегодня хочу поделиться идеями, как можно реализовать синхронизацию данных между различными хранилищами. Такие задачи иногда возникают в работе, например, при удалении пользовательских данных в рамках General Data Protection Regulation (GDPR) и California Consumer Privacy Act (CCPA).

Следовать этим законам нетрудно, если вы поддерживаете среднестатистический сайт или небольшой продукт. Скорее всего, в данных случаях используется один из популярных фреймворков, и таблица пользователей хранится в общеизвестной базе данных (MySQL, PostgreSQL).
Пришел запрос на удаление всех данных пользователя? Не проблема! Удаляем строку в таблице, и готово. Не считая логов, трейсов ошибок, бэкапов и других интересных мест. Подразумеваем, конечно, что все настроено верно и личные данные пользователей скрываются звездочками или еще как-то.
В нашем случае ситуация чуть сложнее. За 13 лет в Semrush создали более 50 инструментов, и все они поддерживаются десятками команд разработки. Несколько лет назад каждая команда вынесла код своего инструмента в отдельный микросервис, данные пользователя мы получаем из сервиса пользователей. Но так или иначе мы храним всю информацию, которая необходима для работы инструмента.
Таким образом, перед нами встал вопрос: как синхронизировать хранилища данных микросервиса и сервиса пользователей.
Способы синхронизации
1. Периодически мы можем сверяться с сервисом пользователей по REST API:
$ curl '<http://user-service.internal.net/api/v1/users/42>' [ { "id": 42, "registration_date": "2015-03-08 01:00:00", "email": "john@example.com", "name": "John", ...
Можно даже сразу отправить список пользователей на проверку, но это как-то странно. Удаления пользователей происходят не так часто, чтобы совершать DDoS-атаки на сервис.
2. Событийно-ориентированная архитектура — еще один подход для решения нашей задачи. Здесь появляются две сущности: генератор сообщений (Publisher) и подписчик (Subscriber), который читает канал событий (topic).
Можно отказаться от сущности подписчика и всем микросервисам выставить endpoint API, на который сервис пользователей слал бы запросы при появлении нового события. В таком случае нужно согласовать протоколы взаимодействия API: REST, JSON-RPC, gRPC, GraphQL, OpenAPI или что там еще может быть. Кроме того, необходимо держать конфигурационные файлы микросервисов, куда слать запросы, а самое главное: что делать, когда запрос не доходит до микросервиса после третьего повтора?
Плюсы данной архитектуры:
-
Асинхронная автоматическая синхронизация хранилищ данных.
-
Нагрузка на сервис пользователей повышается незначительно, так мы добавляем только асинхронную запись события в канал событий.
-
Синхронизация данных различных хранилищ находится отдельно от (и так нагруженного) сервиса пользователей.
Минусы:
-
Минус, вытекающий из первого плюса: неконсистентность данных между сервисами пользователей и остальными микросервисами.
-
Отсутствие транзакций: формируются простые сообщения.
-
Необходимо учитывать, что сообщения в очереди могут повториться.
Вообще, все перечисленные преимущества и недостатки довольно условны и зависят от конкретных задач. Универсальных решений, на мой взгляд, нет. По этой теме полезно будет почитать о CAP-теореме.
Реализация событийно-ориентированной архитектуры на примере Pub/Sub от Google Cloud
Существует множество альтернатив: Kafka, RabbitMQ, но наша команда выбрала решение от Pub/Sub Google Cloud, так как мы уже используем Google Cloud (подробнее можно почитать в статье моего коллеги Никиты Шальнова: Что такое Immutable Infrastructure), и оно проще в настройке тех же Kafka или RabbitMQ.

В нашем случае Publisher — это сервис пользователей, а Subscriber — микросервис команды определенного инструмента. Subscriber’ов (как и Subscription) может быть сколько угодно. Учитывая, что в Semrush большое количество инструментов и команд, очередь с подписчиками для нас идеальна:
-
Каждый читает очередь с необходимой для него частотой.
-
Кто-то может выставить endpoint, и topic будет его вызывать немедленно при появлении сообщения (если нужно получать новые сообщения моментально).
-
Даже экзотические варианты, например откат базы данных инструмента из старого бэкапа, не вызывают проблем: просто перечитаем сообщения из топика (стоит оговориться, что нужно учитывать идемпотентность запросов, и, возможно, вам стоит читать топик не с начала, а с какого-то момента).
-
Subscriber предоставляет REST протокол, но для упрощения разработки существуют еще клиенты для различных языков программирования: Go, Java, Python, Node.js, C#, C++, PHP, Ruby.
Можно использовать один топик с разными сообщениями, например, у нас есть несколько типов сообщений: изменение пользователя, изменение доступов пользователя и так далее.
Пример реализации
Создание топика и подписчика:
gcloud pubsub topics create topic gcloud pubsub subscriptions create subscription --topic=topic
Подробнее можно узнать в документации.
Создание пользователя для чтения топика:
gcloud iam service-accounts create SERVICE_ACCOUNT_ID \ --description="DESCRIPTION" \ --display-name="DISPLAY_NAME" gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID@PROJECT_ID.iam.gserviceaccount.com" \ --role="pubsub.subscriber" gcloud iam service-accounts keys create key-file \ --iam-account=sa-name@project-id.iam.gserviceaccount.com
Скачанный ключ в формате json нужно сохранить и пробросить сервису. Не забывайте про правила обращения с секретами! Об этом знают все и немного больше мои коллеги из Security-команды. Дайте знать в комментариях, если тема кажется вам полезной и интересной для нашей следующей статьи.
Пользователь для публикации сообщений создается аналогично, за исключением роли: —role=»pubsub.subscriber» → —role=»pubsub.publisher».
Для примера возьмем один из наших микросервисов, работающих на Python c Celery. Для сообщений из сервиса пользователей есть схема, описанная с помощью Protocol Buffers.
import json import os import celery from google.cloud import pubsub_v1 from google.oauth2 import service_account from user_pb2 import UserEventData PUBSUB_SERVICE_ACCOUNT_INFO = json.loads(os.environ.get('PUBSUB_SERVICE_ACCOUNT', '{}')) PUBSUB_PROJECT = 'your project' PUBSUB_SUBSCRIBER = 'subscription' @celery.shared_task def pubsub_synchronisation() -> None: credentials = service_account.Credentials.from_service_account_info( PUBSUB_SERVICE_ACCOUNT_INFO, scopes=['<https://www.googleapis.com/auth/pubsub>'] ) with pubsub_v1.SubscriberClient(credentials=credentials) as subscriber: subscription_path = subscriber.subscription_path(PUBSUB_PROJECT, PUBSUB_SUBSCRIBER) response = subscriber.pull(request={"subscription": subscription_path, "max_messages": 10000}) ack_ids, removed_user_ids = [], [] for msg in response.received_messages: user_event_data = UserEventData() user_event_data.ParseFromString(msg.message.data) removed_user_ids.append(user_event_data.Id) ack_ids.append(msg.ack_id) # Here you can do everything with removed users :) subscriber.acknowledge(request={"subscription": subscription_path, "ack_ids": ack_ids})
И запускаем задание раз в пять минут, так как удаление пользователей — не такая частая операция:
CELERY_BEAT_SCHEDULE = { 'pubsub_synchronisation': { 'task': 'tasks.pubsub_ubs_synchronisation', 'schedule': timedelta(minutes=5) },
Пример публикации сообщений в топик на Python реализуется аналогичным образом. Используйте PublisherClient вместо SubscriberClient и вызывайте метод publish вместо pull.
В результате есть синхронизация удаления пользователей для соответствия законам GDPR/CCPA. Например, в 7:37 произошло массовое удаление аккаунтов из сервиса хранения учетных записей пользователей. В 7:40 сработала задача на получение данных из Topic’а. Все задачи выбраны, локальная база синхронизирована.

В статье мы рассмотрели две архитектуры и остановились на событийно-ориентированной. Вполне вероятно, что в вашем случае можно будет обойтись ручной синхронизацией.
Надеюсь, этот материал окажется полезным. Спасибо за внимание!
ссылка на оригинал статьи https://habr.com/ru/company/semrush/blog/665618/
Добавить комментарий