Реализация Триггеров TSQL на Python

от автора

В прошлой статье я рассказал про общую структуру проекта, про работу Kafka с CDC для получения данных из базы. Теперь пришло время поговорить про саму реализацию триггеров на Python. Как говорилось в предыдущей статье, мы будем реализовывать только триггеры Before (Instead Of останутся в базе без изменений). Итак, что же нам необходимо предусмотреть при разработке?

  1. Каждый триггер будет запускаться как отдельный Deployment в K8s, то есть нужно предусмотреть удобный запуск триггеров.

  2. Один триггер может обрабатывать только один топик из Kafka.

  3. В каждом триггере должна быть возможность точечно настраивать фильтры по получаемым из Kafka данным.

Чтение Kafka

Для начала необходимо реализовать чтение топиков Kafka. На этом этапе мы знаем, что один топик — это данные из одной таблицы, и один триггер может обрабатывать данные только из одного топика. Таким образом, мы пришли к реализации ServiceLocator. Только мы будем реализовывать этот паттерн через декораторы.

@SubscribeKafkaTopik('Sales') class TrSalesUpdate(ABCTrigger):

Таким образом, при запуске нашего сервиса мы будем сразу получать топик, который нужно будет слушать.

class MetaTriggers(type):     def __getitem__(cls, trigger_name):         return cls.__triggets__[trigger_name]  class SubscribeKafkaTopik(metaclass=MetaTriggers):     __triggets__ = {}     topok_name = None      def __new__(cls, topik):         if not hasattr(cls, 'instance'):             cls.instance = super(SubscribeKafkaTopik, cls).__new__(cls)             cls.instance.topok_name = topik         return cls.instance       def __call__(self, cls):         if cls.__name__ not in self.__triggets__:             self.__triggets__[cls.__name__] = cls(self.topok_name)         return cls       def __init__(self, topik):         self.topok_name = topik      @classmethod     def print(cls):         print(cls.__triggets__)      @classmethod     def get(cls, trigger_name):         if trigger_name not in cls.__triggets__: return None         return cls.__triggets__.get(trigger_name)

Теперь, как же запускать нужный нам топик? Учитывая, что мы реализовали регистрацию каждого триггера и прикрепили сопоставление триггера и топика, нам достаточно реализовать получение названия класса триггера, который мы хотим запустить через аргументы. Мы сделали это через ArgumentParser.

parser.add_argument('--trigger', help='Запускаемый триггер', default=os.getenv('TRIGGER_CLS'))

Пример запуска:

python main.py --trigger TrSalesUpdate

Далее мы получаем название триггера и, по названию триггера, начинаем слушать топик Kafka, который указан в декораторе SubscribeKafkaTopic.

 SubscribeKafkaTopik[args.trigger].listen()

Что здесь происходит?

Во первых, мы получаем из SubscribeKafkaTopic класс, который был зарегистрирован как триггер с использованием декоратора @SubscribeKafkaTopic. Так как при запуске мы указали параметр —trigger TrSalesUpdate, то и на этапе SubscribeKafkaTopic[args.trigger] нам вернется класс TrSalesUpdate. Но откуда метод listen() и что он делает? Тут тоже все достаточно просто. Наш класс TrSalesUpdate, да и все другие классы, которые являются триггерами, унаследованы от базового класса ABCTrigger.

class ABCTrigger(ABC):       def __init__(self, topik_name = None):         if topik_name:             self.consumer = KafkaConsumer(                 topik_name,                 group_id=self.__class__.__name__,                 api_version=(0,10),                 bootstrap_servers=",".join(credentials['kafka']['bootstrap_servers']),                 auto_offset_reset='latest',                 value_deserializer=lambda x: loads(x.decode('utf-8')) if x is not None else None,             )             self.consumer.poll(timeout_ms=10000)      @abstractmethod     def call(self, message, key = None):         ...       def listen(self):         print(f"Start Listen kafka {self.__class__.__name__}")         for message in self.consumer:             if message is None: continue             Thread(target=self.call, args=(message.value, message.key)).start()

В этом классе есть метод listen(), который начинает слушать топик Kafka. Таким образом, конструкция SubscribeKafkaTopic[args.trigger].listen() запускает получение сообщений из указанного топика.

Во-вторых, после получения сообщений из указанного топика Kafka, сообщение передаётся в метод call.

Thread(target=self.call, args=(message.value, message.key)).start()

Таким образом, в каждом триггере должна быть реализация метода call. Для того, чтобы реализовать логику обработки сообщений для каждого отдельного триггера

def call(self, message, key = None): ...

Теперь нужно рассмотреть, как обрабатывать различные типы событий, чтобы наш триггер срабатывал только при необходимых нам действиях.

Фильтры событий

Как говорилось ранее, нам доступны следующее события:

  • r — read — операция чтения данных из таблицы. Возникает в момент подключения Kafka-Connect к таблице.

  • c — create — операция создания записи, аналог insert

  • u — update — операция обновления записи

  • d — delete — операция удаления записи

Зная эти типы событий, нам необходимо сделать так, чтобы в дальнейшем наш триггер выполнялся при одном или нескольких событиях.

Например, у нас есть триггер обновления цены счета после того, как товар в счете был добавлен, изменен или удален. Соответственно, такой триггер должен срабатывать при следующих событиях: c, u, d.

Или другой пример: нам необходимо в момент добавления товара в счет добавлять себестоимость этого товара в таблицу с товарами в счете. Такой триггер уже должен работать только при событии создания (insert), то есть с типом c.

Реализовать такой фильтр по событиям мы решили через декораторы.

@FilterActionType('u', 'c') def call(self, message, key = None): ...

Таким образом, если тип события не совпадает с типом, который был передан в декоратор, то метод call не сработает.

Скрытый текст
class FilterActionType:     def __init__(self, *actions):         self.actions = actions      def __call__(self, fn):         def call_func(*args, **kwargs):             if args[1]['payload']['op'] in self.actions: return fn(*args, **kwargs)             return False         return call_func

Однако это не всё. Что если триггер должен срабатывать только в том случае, если изменилось какое-то определённое поле либо список определённых полей? И только в том случае, если необходимые нам поля затронуты при изменении, выполнять триггер. Возьмём тот же пример с обновлением цены счета при изменении товара в счёте. Цену счёта нам нужно менять только если изменилось количество товара или изменилась цена товара. Но нам не нужно, чтобы триггер срабатывал, когда, например, изменено описание у этого товара в счёте.

Это мы также сделали через декоратор.

@FilterActionType('u') @FilterUpdatedRow('Price', 'Quantity') def call(self, message, key = None): ...

Таким образом, если условие не подойдет хотя бы по одному из декораторов, то метод выполнен не будет.

Скрытый текст
class FilterUpdatedRow:     def __init__(self, *columns):         self.columns = columns      def __call__(self, fn):         def call_func(*args, **kwargs):             for column in self.columns:                 if args[1]['payload']['before'][column] != args[1]['payload']['after'][column]:                     return fn(*args, **kwargs)             return False         return call_func

А теперь представим такую ситуацию. У нас есть отгрузочный документ, который закрывается, переводится на последнюю стадию, например, с типом «Закрыли». В таком случае нам необходимо проставить дату, когда был закрыт этот документ. При этом важно учитывать, что дату нужно проставлять только у тех документов, которые были закрыты только на складе A. Таким образом, нам нужно реализовать триггер, который сработает при следующих условиях:

  1. Триггер только на событие обновления

  2. Триггер только на изменение поле Status

  3. Триггер только на измене поля Status на значение равное Закрыто и Склад должен быть равен значению A

Что мы имеем сейчас? У нас есть два фильтра, которые помогут нам реализовать два первых пункта. Но что делать с третьим? Правильно, реализовать!

Что мы видим из ТЗ? Нам нужно реализовать фильтр по значению строк. Это достаточно просто, так как у нас из Kafka приходят значения after и before, и нам достаточно проверить, равно ли значение указанного поля (в нашем случае Status = ‘Закрыто’ и Store = ‘A’) значению в блоке after. И тут мы можем пойти двумя путями.

  1. Реализовать декоратор отдельно для каждого поля. То есть в каждый отдельный декоратор мы будем передавать проверку на значение каждого поля. И это будет работать, так как если хоть один из декораторов не сработает, метод call не выполнится. Это подходит под логическое «И». Однако стоит предусмотреть тот вариант, что в ТЗ может стоять «ИЛИ», и тогда нам этот вариант не подойдет.

  2. Реализовать декоратор, который принимает тип фильтрации (or или and) и в зависимости от этого фильтрует данные. И мы решили выбрать именно этот путь. Не понятно? Сейчас посмотрим на реализацию, и всё станет понятнее.

Для начала посмотрим, как эта фильтрация будет выглядеть в триггере:

@FilterActionType('u') @FilterUpdatedRow('Status') @FilterRowData(       and_(           [               lambda record: record['after']['Status'] == 'Закрыто',               lambda record: record['after']['Store'] == 'A',           ]       )  )  def call(self, message, key = None): ...

В @FilterActionType(‘u’) проверяем, что тип события это обновление

В @FilterUpdatedRow(‘Status’) проверяем, что во время обновления было изменено именно значение поля Status

В @FilterRowData реализуем проверку наших значений. Так как в задаче нужно проверить, что Status = ‘Закрыто’ и Store = ‘A’, мы реализовали проверку этих условий через and_, в который передаем лямбда-методы по проверке данных. Таким образом, мы можем реализовать любую логику проверки данных, которые мы получаем из Kafka.

Скрытый текст
class BaseFunc:     def __init__(self, filters = []):         self.filters = filters

Скрытый текст
class and_(BaseFunc):       def __call__(self, data):         if not self.filters: return True         result_filters = []         for idx, filter_ in enumerate(self.filters):             if (idx >= 2) & (sum(result_filters) != idx): return False             result_filters.append(int(filter_(data)))          return sum(result_filters) == len(self.filters)  class or_(BaseFunc):       def __call__(self, data):         if not self.filters: return True         for filter_ in self.filters:             if filter_(data): return True          return False

Запуск проверки происходит в декораторе FilterRowData. И если фильтр, который мы передали в конструктор (в данном случае and_), вернёт False, то метод call не отработает.

class FilterRowData:     def __init__(self, filter_func: BaseFunc = None):         self.filter_func = filter_func      def __call__(self, fn):         def call_func(*args, **kwargs):             if self.filter_func is None: return fn(*args, **kwargs)             if self.filter_func(args[1]['payload']): return fn(*args, **kwargs)             return False         return call_func

Итоги

Таким образом, мы реализовали основные фильтры для данных, которые получаем из топиков Kafka, и в каждом из запущенных триггеров можно точечно настраивать фильтры под те данные, которые мы ожидаем получить и с которыми будем работать в данном классе. При этом мы реализовали достаточно простую регистрацию триггеров, что позволяет легко запускать нужный обработчик.

Однако остаются и задачи в бэклоге. Например, нужно сделать так, чтобы одна запись обрабатывалась триггером лишь однажды. То есть, если запись с одним и тем же идентификатором обновлялась несколько раз, то триггер для этой записи должен сработать только один раз. Или же реализовать чтение из базы слейва, а запись в базу мастера, чтобы разгрузить рабочую базу, с которой мы будем работать больше всего.


ссылка на оригинал статьи https://habr.com/ru/articles/839338/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *