
Введение
В наше время, с развитием бизнеса и увеличением объемов автоматизации и бизнес-процессов, эффективная автоматизация некоторых таких бизнес-процессов становится ключевым фактором успеха для многих компаний. Наша компания не является исключением. По мере роста и развития нашего бизнеса наша CRM-система столкнулась с проблемами медленной работы и увеличения блокировок в базе. Увеличившееся количество бизнес-процессов и интеграций привело к сильной загрузке системы, что начало вызывать блокировки в базе данных и временные остановки процессов внутри компании – ситуация, которую нельзя оставить без изменений.
Мы поняли, что для продолжения успешной работы нам необходимо решение, способное справиться с ежедневной нагрузкой, интегрировать новые сервисы и обрабатывать сложные процессы без задержек. Таким образом, мы столкнулись с вопросами: как избавиться от проблем блокировки, как оптимизировать интеграции и как эффективно обрабатывать сложные запросы, не загружая при этом базу данных.
В результате наших размышлений мы приняли решение разделить нашу CRM-систему на микросервисы. В этой статье я хотел бы поделиться с вами нашим опытом разгрузки CRM-системы и показать, какими путями мы шли для решения выше поставленных вопросов.
Самое начало
Изначально наша CRM-система была загружена не сильно. Было несколько бизнес-процессов, которые в основном отвечали за обновление суммы счета при изменении товара в нем. Все эти процессы были реализованы с использованием триггеров, поскольку наша система, к сожалению, не предоставляла других возможностей. Однако со временем количество таких бизнес-процессов, реализованных на триггерах, стало увеличиваться.
Кроме того, появились и другие системы, которые должны были обмениваться данными с нашей CRM-системой, такие как 1С, сайт, складская система 1С и так далее. В результате всех этих интеграций и процессов мы столкнулись с увеличением количества блокировок, которые существенно затрудняли работу сотрудников внутри нашей CRM-системы.
Разделение на Мастер — Слейв
Первое, что пришло нам в голову, это разделить базу на Master-Slave. Мы включили репликацию транзакциями и стали получать данные на слейве в реальном времени, что позволило нам вынести все интеграции и скрипты, которые активно читают базу, на слейв, чтобы не загружать мастера, на котором активно работают пользователи. Это помогло на первое время, пока количество бизнес-процессов (которые были реализованы на триггерах) не стало еще больше. И тут мы начали задумываться, как можно избавиться от этих триггеров и сделать так, чтобы они отрабатывали асинхронно, не блокируя транзакцию пользователя.
Немного о триггерах
Когда пользователь вставляет запись в таблицу, запускаются триггеры на INSERT (AFTER/INSTEAD OF). И пока все триггеры, которые запускаются при событии добавления записи, не отработают, транзакция на вставку новой записи не завершится. Таким образом, мы можем столкнуться (и столкнулись) с блокировками.
Мы решили перенести все триггеры на Python-скрипты. Однако нам нужно было придумать, как мы будем получать новые транзакции и тип этой транзакции (INSERT/UPDATE/DELETE). И тут мы пришли к CDC и Kafka-Connect.
Подключение CDC и Kafka-Connect к MSSQL
Тут нам на помощь пришли CDC и Kafka Connect. На слейве мы включили CDC. Почему именно на слейве? Здесь все просто: при включении CDC MSSQL создает системные таблицы, в которых хранит все транзакции, которые были сделаны. Поскольку у нас за сутки может быть более 2 миллионов транзакций над одной таблицей, такой журнал будет занимать много места, что может негативно сказаться на производительности сервера с работающей базой данных. Поэтому решено было включить CDC на слейве, чтобы не затрагивать (не нагружать) мастера. Кроме того, у нас настроена репликация транзакциями, поэтому данные на слейв поступают в реальном времени.
CDC (Change Data Capture) — это механизм, используемый в базах данных SQL для отслеживания и захвата изменений данных в базе данных. CDC позволяет автоматизировано отслеживать и записывать изменения, производимые над данными в реальном времени. Этот механизм обычно используется для репликации данных, управления журналами изменений, аудита и других целей, где важно знать, какие изменения были внесены в базу данных и когда.
CDC обычно работает путем захвата изменений, происходящих с данными, и записи этих изменений в специальные таблицы — таблицы журнала изменений (change log tables) или журнальные файлы.
Kafka Connect — это платформа для интеграции данных, которая позволяет легко и надежно подключать различные источники и приемники данных к Apache Kafka. Она предоставляет готовые инструменты для создания и запуска коннекторов, которые позволяют передавать данные между Kafka-топиками и внешними системами.
Концепция Kafka Connect основана на идее использования легковесных коннекторов для обеспечения интеграции с различными системами без необходимости писать собственный код. Коннекторы могут быть разработаны и поддерживаться сообществом или сторонними поставщиками, что позволяет упростить процесс интеграции и повысить масштабируемость.
Kafka Connect состоит из двух основных компонентов:
Connectors — компоненты, которые определяют и управляют потоком данных между источниками данных и Kafka, а также между Kafka и приемниками данных.
Workers — процессы, запускаемые на узлах кластера Kafka Connect, которые выполняют коннекторы и обрабатывают передачу данных.
Kafka Connect позволяет значительно упростить процесс интеграции данных, улучшить надежность и масштабируемость передачи данных между различными системами с помощью Apache Kafka.
Включение CDC
Hidden text
USE [db_name]; GO -- Включение CDC для БД EXEC sys.sp_cdc_enable_db GO --Включение CDC для Таблицы posts EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'posts', @role_name = NULL, @filegroup_name = N'my_file_group' --Можно создать в настройках базы GO
После включения CDC, мы развернули кластер Kafka для получения данных из таблиц. Мы решили, что данные в топиках будут храниться не более часа, а вся история будет грузиться в Minio на случай если придется работать только с затронутыми за период данными (а такие БП есть).
После чего мы подключили Kafka-Connect к базе и начали получать данные в топиках Kafka
Пример Kafka-Connect к MSSQL
Hidden text
#!/bin/bash curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d ' { "name": "<тут название>", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "<Тут IP MSSQL>", "database.port": "<Тут Порт>", "database.user": "<Тут Пользователь БД>", "database.password": "<Тут пароль от пользователя БД>", "database.names": "<Тут Название БД>", "database.encrypt": "false", "topic.prefix": "<тут название префикса>", "table.include.list": "<тут таблица>", "schema.history.internal.kafka.bootstrap.servers": "<Тут адреса Кластера Kafka указанные через ,>", "schema.history.internal.kafka.topic": "####" } }
Пример Kafka-Connect к S3
Hidden text
#!/bin/bash curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d ' { "name": "<тут название>", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "store.url":"<Тут адрес Minio/S3>", "tasks.max":"1", "topics":"<Тут Название топика из которого нужно забирать данные>", "s3.bucket.name":"<Тут Название Бакета в Minio/S3>", "s3.region":"us-east-1", "s3.part.size":"5242880", "flush.size":"100000", "locale":"en", "path.format":"'date'=YYYY-MM-dd/'hour'=HH", "partition.duration.ms":"600000", "rotate.schedule.interval.ms":"60000", "timezone":"UTC", "timestamp.extractor":"Record", "format.class":"io.confluent.connect.s3.format.json.JsonFormat", "enable.ssl.certificate.verification":"False", "storage.class":"io.confluent.connect.s3.storage.S3Storage", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable":"false", "behavior.on.null.values":"ignore" } }
Пример данных в Kafka
Hidden text
{ "schema": { "type": "struct", "fields": [], "optional": false, "name": "####", "version": 1 }, "payload": { "before": {}, "after": {}, "source": { "version": "2.1.2.Final", "connector": "sqlserver", "name": "####", "ts_ms": 1703674805647, "snapshot": "false", "db": "####", "sequence": null, "schema": "####", "table": "####", "change_lsn": "00002073:00081cb5:0006", "commit_lsn": "00002073:00081da4:0020", "event_serial_no": 2 }, "op": "u", "ts_ms": 1703674805975, "transaction": null } }
Таким образом мы получаем все новые транзакции в топиках Kafka. И в реальном времени можем их обрабатывать + у нас есть тип транзакций:
— r — Read (Появляется когда Kafka считывает все данные из таблицы)
— c — Create Появляется при вставке новой записи в таблицу
— u — Update Проявляется при изменении записи в таблице
— d — Delete появляется при удалении записи из таблицы
Также в топиках есть данные о записи которая была до изменения, что позволяет нам сравнивать данные до и после изменения.
Получение данных из топиков Kafka в Pyhon
О том как работает Kakfa есть хорошая статья: тут https://practicum.yandex.ru/blog/broker-soobsheniy-apache-kafka/ и тут https://habr.com/ru/companies/slurm/articles/550934/
Как же нам получать данные в наших скриптах?
Мы использовали библиотеку kafka и с ее помощью подключались к кластеру Kafka
KafkaConsumer( self.__context.topik, bootstrap_servers=",".join(credentials['kafka'['bootstrap_servers']), auto_offset_reset='latest', group_id=f"{self.__context.table}.{self.__context.trigger}", value_deserializer=lambda x: loads(x.decode('utf-8')) if x is not None else None, api_version=(0,10) )
Здесь все достаточно просто. Первым параметром мы указываем топик который будем слушать, далее указываем список адресов Kafka в кластере через запятую. Далее в параметре auto_offset_reset указываем что хотим получать только новые записи. И наконец параметр group_id в котором мы указываем ID группы слушателей. Так как один топик могут слушать несколько скриптов Consumer’ов, то нам нужно указать group_id, чтобы в контексте одной группы, каждый скрипт получал уникальное сообщение. Таким образом мы можем разгрузить нагруженные топики увеличив кол-во патриций и кол-во Consumer’ов.
Теперь как же обрабатывать данные? Во-первых в данных, которые мы получаем есть «op», который содержит тип транзакции, о которых я писал выше. Следовательно, мы можем реализовывать обработчики на различные события (INSERT/UPDATE/DELETE), прям как в триггерах MSSQL (это как раз то к чему мы и шли). Теперь, зная о том что мы можем получать тип транзакции мы реализуем декоратор, который будет принимать тип ожидаемой транзакции и если она совпадает то выполнять нужный нам скрипт в противном случае ничего не делать.
Пример реализации декоратора
class OnAction: def __init__(self, actions=[]): self.actions = actions def __call__(self, fn): def call_func(*args, **kwargs): if args[1]['payload']['op'] in self.actions: return fn(*args, **kwargs) return call_func
Пример использования Декторатора
class AddPurchaseToShedex(Module): def __init__(self, context = None, param = {}): super().__init__(context, param) @OnAction(['u', 'c']) def __call__(self, message): ...
Во-вторых надо разобраться с данными которые мы хотим получать. В JSON который отправляется в Kafka есть объект before с данными которые были в таблицы до обновления либо после удаления и объект after с данными которые поступили при создании записи либо при ее изменении. Здесь все еще проще если к нам приезжает событие INSERT/UPDATE то новые данные будут лежать в after. Если к нам приезжает событие DELETE то данные которые были удалены будут лежать в before
Появление K8s
Хорошо, скрипты, которые реализуют логику Триггеров из БД у нас есть. Но один слушатель не справляется с нагрузкой которая поступает в топик Kafka. Да и к тому же, нужно администрировать большое количество таких триггеров и иметь возможность их скейлить. Что делать? Мы увеличили кол-во партиции и нам нужно увеличить кол-во слушателей в каждой группе. Тут 2 варианта:
1. Мы можем запускать несколько процессов силами Python
2. Мы можем использовать K8s
Первый вариант нас не устраивал. Во-первых, потому что скрипт может вывалиться и его придется поднимать руками (не всегда, можно обойтись конечно и DockerCompose), во-вторых у нас таких скриптов становится много, и их нужно админить, обновлять и масштабировать. Если мы будем использовать например DockerCompose то это это сработает в рамках одного сервера.
deploy: replicas:3 #количество копий скрипта, которые вы хотите запустить
Но нам нужно иметь возможность масштабировать скрипты по разным серверам. И тут нам на помощь приходит K8s, который позволил нам :
1. Автоматизировать деплой новых версий с использованием Gitlab-CI + ArgoCD.
2. Упростить увеличение кол-ва слушателей через параметр REPLICASET Нам достаточно в values.yaml указать количество реалик скрипта и на каких серверах он будет отрабатывать (на всех или только на каком то определенном).
3. Автоматизировать запуск скриптов при падении. Если какой-то из скриптов выпадет с ошибкой K8s самостоятельно его поднимет
4. Облегчить введение новых нод в кластер K8s что позволяет нам практически неограниченно расширять наши мощности под новые и новые сервисы
Краткие итоги
Таким образом мы добились следующего
-
За месяц работы, мы развернули новую инфраструктуру на основе K8s, настроили Kafka, реплику
-
Перенесли все основные триггеры на MSA, что значительно разгрузило нашу CRM систему и мы избавились от блокировок и увеличили скорость ее работы.
-
Создали инструмент для разработки новых БП, которые изначально были нам не доступны.
ссылка на оригинал статьи https://habr.com/ru/articles/819931/
Добавить комментарий