Бизнес-сериал: формируем BI-систему в строительстве почти в прямом эфире. Часть II

от автора

Привет, друзья!

Продолжаем нашу серию статей о создании BI-системы в компании Sminex. В первой части мы рассказали, что в качестве основного места хранения аналитических данных используется хранилище с якорной моделью. Она характеризуется высокой нормализацией и строгими правилами наименования объектов.

Строгие правила наименования лучше сразу прописать в соглашении об именовании объектов. В нашем случае мы использовали базовое соглашение [Anchor Modeling: Naming Convention] с небольшими доработками и упрощениями.

Однако, чтобы добиться высокой нормализации, каждый объект, независимо от его типа, нужно выделять в отдельную таблицу, прописывать для него DDL и ключи. Учитывая количество объектов, это превращается в огромную рутинную работу. При этом нельзя допускать ситуации «сейчас быстро сделаем, а потом поправим». Это идеальная область для автоматизации в условиях ограничения ресурсов.

Важно!

Мы не претендуем на абсолютную правильность реализации, оптимальность и соответствие нашего решения всем общепринятым правилам. Конкретно в нашей ситуации, с нашими ресурсами и объёмом задач от заказчиков, это решение удалось реализовать достаточно быстро, и оно начало приносить свои плоды. Мы будем рады услышать конструктивную критику по поводу решения. Самое полезное опубликуем в следующей статье!

Шаг 1. Как облегчить работу инженерам по выполнению поставок данных

Необходимо:

  1. Придумать велосипед менее затратный способ укладки данных в якорную модель по сравнению с ручным созданием и настройкой таблиц.

  2. Разработать способ автоматической генерации однотипных SQL-запросов, исключающий ошибки в наименовании объектов и в логике работы с данными.

  3. Сохранять логи о совершенных действиях.

Шаг 2. ООП нам поможет

Для реализации задуманного мы разработали Python-модуль anchor_model.py, в котором создали классы и наделили их необходимыми методами. Давайте познакомимся с ними поближе.

Класс AnchorModel

Этот класс является суперклассом. Все остальные классы наследуются от него. Он содержит три метода:

  • exists — проверяет, существует ли уже такая таблица в DWH.

  • get_description — возвращает описание таблицы.

  • log — записывает в логи создание или наполнение таблицы якорной модели.

class AnchorModel:
class AnchorModel:     """     Суперкласс для работы с таблицами якорной модели     Все остальные классы наследуются от него       Методы:     [exists] проверяет, существует ли уже такая таблица в DWH     [get_description] возвращает описание таблицы (комментарий)     [log] логирует обновление данных     """      def __init__(self):         # Подключение к DWH         self.engine_dwh = get_engine('dwh')         # Название таблицы         self.table_name = None         # Источник данных         self.source = None         # Тип данных (задается через create или get_data_type)         self.data_type = None         # Описание (задается через create или get_description)         self.description = None         # Тип данных поля инкремента         self.increment = None          # Переменные для логирования         self.log_string = ''         self.log_rowcount = 0         self.is_success = False         self.exception = ''      def exists(self):         """         Проверка на то, что такая таблица уже существует в DWH         """         try:             query = f"""                 SELECT EXISTS (                     SELECT 1                     FROM information_schema.tables                     WHERE table_name = '{self.table_name}'                 );             """             exists = self.engine_dwh.execute(query).scalar()              if exists:                 logging.info(f"{self.log_string} Таблица [{self.table_name}] уже существует")                 return True             else:                 logging.info(f"{self.log_string} Таблицы [{self.table_name}] не существует")                 return False          except:             raise          finally:             # Закрытие подключения             self.engine_dwh.dispose()      def get_description(self):         """         Возвращает описание (комментарий к таблице)         """         try:             query = f"""                 SELECT d.description AS table_comment                 FROM pg_class c                 JOIN pg_description d ON c.oid = d.objoid                                      AND d.objsubid = 0 /* уровень 0 = комментарии к таблицам, не к колонкам) */                 WHERE c.relname = '{self.table_name}'             """              description = self.engine_dwh.execute(query).scalar()              if description is None:                 raise Exception(f'Не удалось получить описание таблицы {self.table_name}, '                                 f'проверь название и что у нее есть комментарий')             else:                 self.description = description                 return self.description          except:             raise          finally:             # Закрытие подключения             self.engine_dwh.dispose()      def log(self):         """         Логирует обновление данных         """         try:             query = f"""                 INSERT INTO _log (name, source, rows, is_success, exception)                 VALUES ('{self.table_name}', '{self.source}', {self.log_rowcount}, {self.is_success}, '{self.exception}')             ;"""             self.engine_dwh.execute(query)          except:             raise          finally:             # Закрытие подключения             self.engine_dwh.dispose()

Класс Anchor

Класс Anchor наследуется от AnchorModel. При инициализации необходимо задать обязательный параметр name — имя якоря. В классе Anchor предусмотрены следующие методы:

  • create_anchor — создает таблицу якоря в DWH.

  • update_anchor — обновляет данные в якоре.

  • get_anchor_data_type — возвращает тип данных бизнес-ключа существующего якоря.

  • get_max_anchor_bk — возвращает максимальный ID бизнес-ключа якоря.

class Anchor(AnchorModel):
class Anchor(AnchorModel):     """     Класс для работы с якорем       Атрибуты:     [name] название якоря. При инициализации задается без префикса, напр. tender или document       Методы:     [create_anchor] создает таблицу в DWH с нужным префиксом и триггером для генерации суррогатного ключа     [update_anchor] принимает название источника и SQL-запрос, обновляет данные в якоре     [get_anchor_data_type] возвращает тип данных бизнес-ключа существующего якоря (напр. uuid)     [get_max_anchor_bk] возвращает максимальное значение бизнес-ключа     """      def __init__(self, name: str):         super().__init__()         # Название типа таблицы якорной модели         self.type = 'anchor'         # Название сущности         self.name = name         # Название таблицы = первые два символа названия + _ + название (напр. для tender - te_tender)         self.table_name = self.name[:2] + '_' + self.name         # Название временной таблицы         self.tmp_table_name = self.table_name + '_tmp'         # Название колонки суррогатного ключа = название + _ + id (напр. для tender - tender_id)         self.id_column_name = self.name + '_id'         # Название колонки бизнес ключа = название + _ + bk (напр. для tender - tender_bk)         self.bk_column_name = self.name + '_bk'         # Строка для отображения в логах консоли         self.log_string = f'[Anchor] [{self.name}] ::'      def create_anchor(self, data_type: str, description: str, increment: str = None):         """         Функция для создания таблицы якоря в DWH         [data_type] тип данных якоря         [description] описание (комментарий к таблице)         [increment] тип данных инкремента. Если указан:                   - заменяет колонку insert_ts и хранит в ней значения на момент обновления сущности                   - при загрузке новых данных кроме ключа якоря в запросе также должно фигурировать поле инкремента         """          try:              if self.exists():                 return True              self.data_type = data_type             self.description = description             self.increment = increment              # Создаем таблицу якоря             if self.increment:                 self.engine_dwh.execute(f"""                     CREATE TABLE {self.table_name} (                         {self.id_column_name} uuid PRIMARY KEY,                         {self.bk_column_name} {self.data_type} NOT NULL UNIQUE,                         increment {self.increment} NOT NULL                     );                 """)             else:                 self.engine_dwh.execute(f"""                     CREATE TABLE {self.table_name} (                         {self.id_column_name} uuid PRIMARY KEY,                         {self.bk_column_name} {self.data_type} NOT NULL UNIQUE,                         insert_ts timestamp with time zone DEFAULT now()                     );                 """)              # Добавляем комментарий             self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")              # Создаем триггер для генерации суррогатного ключа при добавлении нового бизнес-ключа             self.engine_dwh.execute(f"""                 CREATE OR REPLACE FUNCTION generate_{self.id_column_name}()                 RETURNS TRIGGER AS $$                 BEGIN                     NEW.{self.id_column_name} := uuid_generate_v5(uuid_nil(), NEW.{self.bk_column_name}::text);                     RETURN NEW;                 END;                 $$ LANGUAGE plpgsql;             """)              # Триггер             self.engine_dwh.execute(f"""                 CREATE TRIGGER {self.table_name}_trigger                 BEFORE INSERT ON {self.table_name}                 FOR EACH ROW                 EXECUTE PROCEDURE generate_{self.id_column_name}();             """)              # Запись в реестр всех таблиц якорной модели.             # Из нее потом генерируется документация и схема хранилища.             self.engine_dwh.execute(f"""                 DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';                 INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name)                 VALUES ('{self.table_name}', 'anchor', '{self.name}', '{self.data_type}', '{self.name}')             ;""")              logging.info(                 f"{self.log_string} Таблица [{self.table_name}] создана с триггером на суррогатный ключ {self.id_column_name}")          except:             raise          finally:             # Закрытие подключения             self.engine_dwh.dispose()      def update_anchor(self, source: str, query: str, increment: bool = False):         """         Загрузка данных якоря из источника         [source] название БД (как в Airflow Connections)         [query] запрос для выгрузки нужных данных         """         try:             self.data_type = self.get_anchor_data_type()             self.description = self.get_description()             self.source = source              # Загрузка и очистка данных             raw_data = sql(db=source, query=query, description=self.description, as_df=True)             data = purify_data(data=raw_data)              # Если после очистки данных не осталось, то завершаем работу             # Логируем в блоке finally             if data.empty:                 self.is_success = True                 return True              # Начинаем загрузку данных в якорь              # Создаем временную таблицу             self.engine_dwh.execute(f"""                 DROP TABLE IF EXISTS {self.tmp_table_name};""")              if increment:                 self.engine_dwh.execute(f"""                     CREATE TABLE {self.tmp_table_name} (                         {self.bk_column_name} {self.data_type} NOT NULL PRIMARY KEY                     );""")             else:                 self.engine_dwh.execute(f"""                     CREATE TABLE {self.tmp_table_name} (                         {self.bk_column_name} {self.data_type} NOT NULL PRIMARY KEY                     );""")             logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')              # Загрузка данных во временную таблицу             data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False, method='multi',                         chunksize=1000)             logging.info(f'Во временную таблицу загружено [{data.shape[0]:_}] строк')              # Наполнение якоря новыми данными             result = self.engine_dwh.execute(f"""                 WITH new_values AS (                     SELECT tmp.{self.bk_column_name}                     FROM {self.tmp_table_name} tmp                     LEFT JOIN {self.table_name} anchor ON anchor.{self.bk_column_name} = tmp.{self.bk_column_name}                     WHERE anchor.{self.bk_column_name} IS NULL                 )                 INSERT INTO {self.table_name} ({self.bk_column_name})                 SELECT {self.bk_column_name} FROM new_values             ;""")             self.log_rowcount = result.rowcount             self.is_success = True             logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружены в якорь')          except Exception as e:             self.exception = str(e).replace('\'', '\'\'')             raise e          finally:             # Удаление временной таблицы             self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")             logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')              # Логирование             self.log()              # Закрытие подключения             self.engine_dwh.dispose()      def get_anchor_data_type(self):         """         Возвращает тип данных бизнес-ключа якоря         """          query = f"""             SELECT data_type             FROM information_schema.columns             WHERE table_name = '{self.table_name}'               AND column_name = '{self.bk_column_name}';         """          data_type = self.engine_dwh.execute(query).scalar()          if data_type is None:             raise Exception('Не удалось получить тип данных бизнес-ключа якоря, проверь название якоря')         else:             self.data_type = data_type             return data_type      def get_max_anchor_bk(self):         """         Возвращает максимальное значение бизнес-ключа якоря         """         try:             query = f"SELECT MAX({self.bk_column_name}) FROM {self.table_name}"             max_bk = self.engine_dwh.execute(query).scalar()             return max_bk         except Exception as e:             raise e         finally:             self.engine_dwh.dispose()

Класс AnchorAttribute

Класс AnchorAttribute наследуется от AnchorModel. При инициализации необходимо задать значения обязательных параметров:

  • anchor_name — имя якоря, которому принадлежит атрибут.

  • name — имя атрибута якоря без префиксов.

  • is_historized — признак исторического атрибута.

class AnchorAttribute(AnchorModel):
class AnchorAttribute(AnchorModel):     """     Класс для работы с атрибутами якоря       Атрибуты:     [anchor_name] название якоря, к которому относится атрибут (напр. tender)     [name] название атрибута якоря без префиксов (напр. name)     [is_historized] признак исторического атрибута        (в таблицу добавляется поле "valid_from", а в датасете должна быть дата изменения атрибута)       Методы:     [create_attribute] создает таблицу в DWH с нужным названием (напр. te_nam_tender_name)     [update_attribute] принимает название источника и SQL-запрос, обновляет данные в атрибуте     [get_attribute_data_type] возвращает тип данных атрибута (напр. text)     [get_attribute_last_date] получение максимальной даты valid_from для итеративной загрузки исторических атрибутов     [get_max_anchor_bk] получение максимального ID бизнес-ключа якоря     """      def __init__(self, anchor_name: str, name: str, is_historized: bool = False):         super().__init__()          # Название типа таблицы якорной модели         self.type = 'attr'          # Инициализация переменных атрибута         self.name = name          # Инициализация переменных якоря         self.anchor = Anchor(name=anchor_name)         self.anchor_name = self.anchor.name         self.anchor_data_type = self.anchor.get_anchor_data_type()         self.anchor_description = self.anchor.get_description()          # Признак исторического атрибута         self.is_historized = is_historized          # Название таблицы атрибута = первые 2 буквы названия якоря + _ + первые 3 буквы названия атрибута + _ +         # название якоря + _ + название атрибута         self.table_name = (self.anchor_name[:2] + '_' + self.name.replace('_', '')[:3]                            + '_' + self.anchor_name + '_' + self.name)         # Если атрибут исторический, то добавляем _his         if self.is_historized:             self.table_name += '_his'         # Название временной таблицы         self.tmp_table_name = self.table_name + '_tmp'          # Строка для отображения в логах         self.log_string = f'[AnchorAttribute] [{self.name}] ::'      def create_attribute(self, data_type: str, description: str):          self.data_type = data_type         self.description = description          try:              if self.exists():                 return True              # Формируем DDL таблицы в зависимости от того, исторический атрибут или нет             if self.is_historized:                 query_ddl = f"""                     CREATE TABLE {self.table_name} (                         {self.anchor.id_column_name} uuid NOT NULL,                         {self.name} {self.data_type} NOT NULL,                         valid_from timestamp NOT NULL,                         valid_to timestamp,                         insert_ts timestamp with time zone DEFAULT now(), /* UTC */                           CONSTRAINT pk_{self.table_name} PRIMARY KEY ({self.anchor.id_column_name}, valid_from),                         CONSTRAINT fk_{self.table_name}_{self.anchor.id_column_name}                         FOREIGN KEY ({self.anchor.id_column_name})                         REFERENCES {self.anchor.table_name} ({self.anchor.id_column_name})                     );                 """              else:                 query_ddl = f"""                     CREATE TABLE {self.table_name} (                         {self.anchor.id_column_name} uuid NOT NULL,                         {self.name} {self.data_type} NOT NULL,                         insert_ts timestamp with time zone DEFAULT now(), /* UTC */                           CONSTRAINT pk_{self.table_name} PRIMARY KEY ({self.anchor.id_column_name}),                         CONSTRAINT fk_{self.table_name}_{self.anchor.id_column_name}                         FOREIGN KEY ({self.anchor.id_column_name})                         REFERENCES {self.anchor.table_name} ({self.anchor.id_column_name})                     );                 """              self.engine_dwh.execute(query_ddl)              # Добавляем комментарий             self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")              # Запись в таблицу со всеми таблицами якорной модели             self.engine_dwh.execute(f"""                 DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';                 INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name, is_historized)                 VALUES ('{self.table_name}', 'attr', '{self.name}', '{self.data_type}', '{self.anchor_name}',                         {self.is_historized});             """)              logging.info(f"{self.log_string} Таблица [{self.table_name}] создана")          except:             raise          finally:             # Закрытие подключения             self.engine_dwh.dispose()      def update_attribute(self, source: str, query: str, full_reload: bool = False):         """         Загрузка данных атрибута якоря из источника         Пока обычные атрибуты заливаются путем TRUNCATE-INSERT,         а исторические атрибуты заливаются инкрементально           [source] название БД из функции get_engine         [query] SQL-запрос для выгрузки данных         [full_reload] если поставить True, то атрибут обновится через TRUNCATE         """          try:             self.data_type = self.get_attribute_data_type()             self.description = self.get_description()             self.source = source              # Загрузка и очистка данных             raw_data = sql(db=source, query=query, description=self.description, as_df=True)             data = purify_data(raw_data)              # Если после очистки данных не осталось, то завершаем работу             if data.empty:                 self.is_success = True                 return True              # Создаем временную таблицу             if self.is_historized:                 query_ddl_tmp = f"""                     CREATE TABLE {self.tmp_table_name} (                         {self.anchor.bk_column_name} {self.anchor.data_type} NOT NULL,                         {self.name} {self.data_type} NOT NULL,                         valid_from timestamp NOT NULL,                         valid_to timestamp                     );                 """             else:                 query_ddl_tmp = f"""                     CREATE TABLE {self.tmp_table_name} (                         {self.anchor.bk_column_name} {self.anchor.data_type} NOT NULL,                         {self.name} {self.data_type} NOT NULL                     );                 """              self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")             self.engine_dwh.execute(query_ddl_tmp)             logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')              # Загрузка данных во временную таблицу             data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append',                         index=False, method='multi', chunksize=1_000)             logging.info(f'{self.log_string} Во временную таблицу загружено [{data.shape[0]:_}] строк')              # Добавление новых значений в якорь             result = self.engine_dwh.execute(f"""                 WITH new_anchor_values AS (                     SELECT tmp.{self.anchor.bk_column_name}                     FROM {self.tmp_table_name} tmp                     LEFT JOIN {self.anchor.table_name} anchor ON tmp.{self.anchor.bk_column_name} = anchor.{self.anchor.bk_column_name}                     WHERE anchor.{self.anchor.bk_column_name} IS NULL                 )                 INSERT INTO {self.anchor.table_name} ({self.anchor.bk_column_name})                 SELECT DISTINCT {self.anchor.bk_column_name} FROM new_anchor_values             ;""")             logging.info(f'{self.log_string} [{result.rowcount:_}] новых строк загружены в якорь')              # Добавление новых значений в атрибут             if full_reload:                 self.engine_dwh.execute(f"TRUNCATE {self.table_name};")              # Если атрибут исторический, то добавляем новые значения             if self.is_historized:                 result = self.engine_dwh.execute(f"""                     MERGE INTO {self.table_name} AS target                     USING {self.tmp_table_name}  AS source                     ON target.{self.anchor.id_column_name} = uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text)                     AND target.valid_from = source.valid_from                       WHEN MATCHED THEN UPDATE                     SET {self.name} = source.{self.name},                         valid_from = source.valid_from                       WHEN NOT MATCHED THEN INSERT                     ({self.anchor.id_column_name}, {self.name}, valid_from)                     VALUES                     (uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text), source.{self.name}, source.valid_from)                 ;""")             # Если атрибут не исторический, то обновляем старые значения и записываем новые (если есть)             else:                 result = self.engine_dwh.execute(f"""                     MERGE INTO {self.table_name} AS target                     USING {self.tmp_table_name}  AS source                     ON target.{self.anchor.id_column_name} = uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text)                       WHEN MATCHED THEN UPDATE                     SET {self.name} = source.{self.name}                       WHEN NOT MATCHED THEN INSERT                     ({self.anchor.id_column_name}, {self.name})                     VALUES                     (uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text), source.{self.name})                 ;""")              self.is_success = True             self.log_rowcount = result.rowcount             logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружено в атрибут')          except Exception as e:             self.exception = str(e).replace('\'', '\'\'')             raise e          finally:             # Удаление временной таблицы             self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")             logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')              # Закрытие подключения             self.engine_dwh.dispose()              # Запись в лог             self.log()      def get_attribute_data_type(self):         """         Возвращает тип данных атрибута         """          if not self.exists():             raise Exception('Атрибута не существует, сначала надо выполнить create_attribute')          query = f"""             SELECT data_type             FROM information_schema.columns             WHERE table_name = '{self.table_name}'               AND column_name = '{self.name}';         """          data_type = self.engine_dwh.execute(query).scalar()          if data_type is None:             raise Exception('Не удалось получить тип данных атрибута, проверь название атрибута')         else:             self.data_type = data_type             return data_type      def get_attribute_last_date(self):         """         Функция для получения максимальной даты valid_from         для итеративной загрузки исторических атрибутов         """         if self.is_historized:             query = f"SELECT MAX(valid_from) FROM {self.table_name}"             last_date = sql(db=self.engine_dwh, query=query,                             description='Последняя записанная в атрибут дата', as_df=True).iloc[0][0]             if last_date is None:                 last_date = datetime.now() - relativedelta(months=12)             return last_date         else:             raise Exception(f'Нельзя получить последнюю дату данных для неисторического атрибута [{self.name}]')      def get_max_anchor_bk(self):         """         Получение максимального ID бизнес-ключа якоря.         Используется для инкрементальной загрузки данных некоторых сущностей         (например логов и транзакций, где есть инкрементальный id строки)         """         try:             query = f"""                 SELECT MAX(anchor.{self.anchor.bk_column_name})                 FROM {self.anchor.table_name} anchor                 JOIN {self.table_name} attr ON anchor.{self.anchor.id_column_name} = attr.{self.anchor.id_column_name}             """             max_bk = self.engine_dwh.execute(query).scalar()             return max_bk         except Exception as e:             raise e         finally:             self.engine_dwh.dispose()

Особое внимание стоит уделить параметру is_historized. Он определяет дальнейшую судьбу атрибута, а именно: нужно ли сохранять его историю изменения или нет.

  • Если атрибут не является историческим, мы всегда будем видеть только его актуальное состояние. При изменении его значения в системе-источнике актуальное значение будет заменять имеющееся в DWH.

  • Если атрибут является историческим, при создании у него появится дополнительное поле valid_from, и при изменении значения атрибута в системе-источнике в таблице атрибута в DWH появится новая строка с новым значением valid_from.

Такой подход позволяет не засорять хранилище ненужными данными, а также сокращает количество операций и экономит ресурсы.

Методы класса AnchorAttribute:

  • create_attribute — создает таблицу атрибута в DWH.

  • update_attribute — обновляет данные в таблице атрибута.

  • get_attribute_data_type — возвращает тип данных атрибута.

  • get_attribute_last_date — возвращает максимальное значение поля valid_from для итеративной загрузки исторических атрибутов.

  • get_max_anchor_bk — возвращает максимальный ID бизнес-ключа якоря.

Класс Tie

Класс Tie наследуется от AnchorModel. При инициализации необходимо задать значения двух обязательных параметров — anchor1 и anchor2 — имена якорей, находящихся в связи.

Методы класса Tie:

  • create_tie — создает таблицу связи в DWH.

  • update_tie — обновляет данные в таблице связи.

  • get_max_anchor_bk — возвращает максимальный ID бизнес-ключа якоря.

class Tie(AnchorModel):
class Tie(AnchorModel):     """     Класс для работы с таблицами связей между якорями       Атрибуты:     [anchor1_name] имя первого якоря     [anchor2_name] имя второго якоря       Методы:     [create_tie] создает таблицу в DWH для связи (напр. te_re_tender_request)     [update_tie] принимает название источника и SQL-запрос, обновляет данные в связи     [get_max_anchor_bk] получение максимального ID бизнес-ключа якоря     """      def __init__(self, anchor1_name: str, anchor2_name: str):         super().__init__()          # Название типа таблицы якорной модели         self.type = 'tie'          # Создаем объекты якорей, чтобы из них получать нужные переменные         self.anchor1 = Anchor(name=anchor1_name)         self.anchor2 = Anchor(name=anchor2_name)          # Описание         self.description = f'Связь сущности "{self.anchor1.get_description()}" с сущностью "{self.anchor2.get_description()}"'          # Название таблицы связи = первые 2 буквы первого якоря + _ + первые 2 буквы второго якоря + _ +         # название первого якоря + _ + название второго якоря         self.table_name = anchor1_name[:2] + '_' + anchor2_name[:2] + '_' + anchor1_name + '_' + anchor2_name         # Название временной таблицы         self.tmp_table_name = self.table_name + '_tmp'          # Строка для отображения в логах         self.log_string = f'[Tie] :: [{self.table_name}]'      def create_tie(self):         """         Создание таблицы связи якорей (если такой еще нет)         """          try:              if self.exists():                 return True              # Формируем DDL таблицы             query_ddl = f"""                 CREATE TABLE {self.table_name} (                     {self.anchor1.id_column_name} uuid NOT NULL,                     {self.anchor2.id_column_name} uuid NOT NULL,                     insert_ts timestamp with time zone DEFAULT now(),                       CONSTRAINT fk_{self.table_name}_{self.anchor1.id_column_name}                     FOREIGN KEY ({self.anchor1.id_column_name})                     REFERENCES {self.anchor1.table_name} ({self.anchor1.id_column_name}),                       CONSTRAINT fk_{self.table_name}_{self.anchor2.id_column_name}                     FOREIGN KEY ({self.anchor2.id_column_name})                     REFERENCES {self.anchor2.table_name} ({self.anchor2.id_column_name})                 );                 CREATE INDEX {self.table_name}_{self.anchor1.id_column_name}_index                   ON {self.table_name} ({self.anchor1.id_column_name});                 CREATE INDEX {self.table_name}_{self.anchor2.id_column_name}_index                   ON {self.table_name} ({self.anchor2.id_column_name});             """             self.engine_dwh.execute(query_ddl)              # Добавляем комментарий             self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';")              # Запись в таблицу со всеми таблицами якорной модели             self.engine_dwh.execute(f"""                 DELETE FROM _anchor_model WHERE table_name = '{self.table_name}';                 INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name)                 VALUES ('{self.table_name}', 'tie', '{self.anchor2.name}', NULL, '{self.anchor1.name}')             """)              logging.info(f"{self.log_string} Таблица [{self.table_name}] создана")          except:             raise          finally:             # Закрытие подключения             self.engine_dwh.dispose()      def update_tie(self, source: str, query:str, full_reload: bool = False):         """         Загрузка данных в таблицу связей (Tie)         Данные для таблицы связей мы получаем в виде бизнес-ключей двух якорей         """         try:             self.create_tie()             self.source = source              # Загрузка и очистка данных             raw_data = sql(db=source, query=query, description=self.description, as_df=True)             data = purify_data(raw_data)              # Если после очистки данных не осталось, то завершаем работу             if data.empty:                 self.is_success = True                 return True              # Создаем временную таблицу             self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")             self.engine_dwh.execute(f"""                 CREATE TABLE {self.tmp_table_name} (                     {self.anchor1.bk_column_name} {self.anchor1.get_anchor_data_type()} NOT NULL,                     {self.anchor2.bk_column_name} {self.anchor2.get_anchor_data_type()} NOT NULL                 );             """)             logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]')              # Загрузка данных во временную таблицу             data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False,                         method='multi', chunksize=1000)             logging.info(f'{self.log_string} [{data.shape[0]:_}] строк загружено во временную таблицу')              # Добавление новых значений в якорь             result = self.engine_dwh.execute(f"""                 WITH new_anchor_values AS (                     SELECT tmp.{self.anchor2.bk_column_name}                     FROM {self.tmp_table_name} tmp                     LEFT JOIN {self.anchor2.table_name} anchor ON tmp.{self.anchor2.bk_column_name} = anchor.{self.anchor2.bk_column_name}                     WHERE anchor.{self.anchor2.bk_column_name} IS NULL                 )                 INSERT INTO {self.anchor2.table_name} ({self.anchor2.bk_column_name})                 SELECT DISTINCT {self.anchor2.bk_column_name}                 FROM new_anchor_values             ;""")             logging.info(f'{self.log_string} [{result.rowcount:_}] новых строк загружены в якорь')              # Наполнение новыми данными              # Если хотим полностью обновить связи, то делаем сначала TRUNCATE             if full_reload:                 query = f"TRUNCATE TABLE {self.table_name};"                 self.engine_dwh.execute(query)              query = f"""                 WITH new_values AS (                     SELECT anc1.{self.anchor1.id_column_name},                            anc2.{self.anchor2.id_column_name}                     FROM {self.tmp_table_name} tmp                     JOIN {self.anchor1.table_name} anc1 ON tmp.{self.anchor1.bk_column_name} = anc1.{self.anchor1.bk_column_name}                     JOIN {self.anchor2.table_name} anc2 ON tmp.{self.anchor2.bk_column_name} = anc2.{self.anchor2.bk_column_name}                     LEFT JOIN {self.table_name}     tie ON anc1.{self.anchor1.id_column_name} = tie.{self.anchor1.id_column_name}                                                        AND anc2.{self.anchor2.id_column_name} = tie.{self.anchor2.id_column_name}                     WHERE tie.{self.anchor1.id_column_name} IS NULL                        OR tie.{self.anchor2.id_column_name} IS NULL                 )                 INSERT INTO {self.table_name} ({self.anchor1.id_column_name}, {self.anchor2.id_column_name})                 SELECT {self.anchor1.id_column_name}, {self.anchor2.id_column_name}                 FROM new_values;             """             result = self.engine_dwh.execute(query)             self.is_success = True             self.log_rowcount = result.rowcount             logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружены в связь')          except Exception as e:             self.is_success = False             self.exception = str(e).replace('\'', '\'\'')             raise e          finally:             # Удаление временной таблицы             self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};")             logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена')              # Закрытие подключения             self.engine_dwh.dispose()              # Запись в лог             self.log()      def get_max_anchor_bk(self):         """         Получение максимального ID бизнес-ключа якоря.         Используется для инкрементальной загрузки данных некоторых сущностей         (например логов и транзакций, где есть инкрементальный id строки)         """         try:             query = f"""                 SELECT MAX(anchor.{self.anchor1.bk_column_name})                 FROM {self.anchor1.table_name} anchor                 JOIN {self.table_name} tie ON anchor.{self.anchor1.id_column_name} = tie.{self.anchor1.id_column_name}             """             max_bk = self.engine_dwh.execute(query).scalar()             return max_bk         except Exception as e:             raise e         finally:             self.engine_dwh.dispose()

Шаг 3. Пробуем применять

Итак, представим задачу. Нам нужно реализовать поставку данных из Jira, а конкретно объекта Sprint, для отчета, демонстрирующего минимальный, медианный и максимальный Lead Time у разных команд разработки, а также его динамику по месяцам.

Пункт 1. Импортируем все наши классы, прописываем аргументы DAG и начинаем создавать задачи. Сначала инициализируем якорь jira_sprint, прописываем создание якоря, базовый запрос и обновление якоря.

Пример DAG (загрузка якоря)
from airflow.models import DAG from airflow.decorators import task from datetime import timedelta   from tools.smnx_data_tools import task_fail_alert from tools.anchor_model import Anchor, AnchorAttribute, Tie   args = {     'owner': 'owner_name',     'start_date': '2024-09-04',     'depends_on_past': False,     'catchup': False,     'retries': 1,     'retry_delay': timedelta(minutes=30),     'on_failure_callback': task_fail_alert }     @task def anc_jira_sprint():     query = """         SELECT "ID" AS jira_sprint_bk         FROM "AO_60DB71_SPRINT"     """      anchor = Anchor(name='jira_sprint')     anchor.create_anchor(data_type='integer', description='Jira спринт')     anchor.update_anchor(source='jira_db', query=query)

Пункт 2. Определяем атрибуты спринта. В нашем случае это название, дата начала и дата завершения. Прописываем базовый запрос, инициализируем атрибуты, создаем и обновляем их. Повторяем для каждого атрибута якоря.

Пример DAG (загрузка атрибутов якоря)
@task def attr_name():     query = """         SELECT "ID"   AS jira_sprint_bk,                "NAME" AS name         FROM "AO_60DB71_SPRINT"     """      anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='name')     anchor_attribute.create_attribute(data_type='text', description='Название')     anchor_attribute.update_attribute(source='jira_db', query=query)     @task def attr_started():     query = """         SELECT "ID"                            AS jira_sprint_bk,                to_timestamp("START_DATE"/1000) AS started         FROM "AO_60DB71_SPRINT"     """      anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='started')     anchor_attribute.create_attribute(data_type='timestamp with time zone', description='')     anchor_attribute.update_attribute(source='jira_db', query=query)     @task def attr_completed():     query = """         SELECT "ID"                               AS jira_sprint_bk,                to_timestamp("COMPLETE_DATE"/1000) AS completed         FROM "AO_60DB71_SPRINT"     """      anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='completed')     anchor_attribute.create_attribute(data_type='', description='')     anchor_attribute.update_attribute(source='jira_db', query=query)

Пункт 3. Определяем связи с другими якорями. В нашем случае jira_sprint имеет связь с jira_issue, так как нам важно понимать, к какому спринту относится задача. Прописываем базовый запрос, инициализируем связь, создаем и обновляем её.

Пример DAG (загрузка связи)
@task def tie_jira_issue():     query = """         SELECT "ID" AS jira_sprint_bk,                i.id AS jira_issue_bk         FROM "AO_60DB71_SPRINT" s         JOIN customfieldvalue v ON v.customfield = '10101' /* sprint */                                AND v.stringvalue = s."ID"::text         JOIN jiraissue        i ON v.issue = i.id     """      tie = Tie(anchor1_name='jira_sprint', anchor2_name='jira_issue')     tie.create_tie()     tie.update_tie(source='jira_db', query=query)

Пункт 4. Оборачиваем полученные задачи в DAG и ставим на регулярный запуск.

Пример DAG (определение свойств DAG, установление последовательности задач)
with DAG(dag_id='am_jira_sprint',          default_args=args,          schedule_interval=timedelta(hours=1),          tags=['AnchorModel'],          max_active_runs=1) as dag:           anc_jira_sprint() >> attr_name() >> attr_started() >> attr_completed() >> tie_jira_issue()

Шаг 4. Profit — подводим итоги

С помощью anchor_model.py мы организовали поставку данных, необходимых для отчета, за пару часов и избежали ошибок в именах объектов. По сути, вся работа инженера свелась к определению имени якоря и его атрибутов, написанию нескольких строк кода и базовых запросов. Время разработки сокращено более чем на 50%, а количество ошибок уменьшилось более чем на 95% по сравнению с полностью ручной разработкой поставки данных.

Благодаря предсказуемости имен объектов в DWH, BI-разработчик может приступить к разработке витрин данных, необходимых для создания отчетов, сразу же после первой отработки DAG. Это, в свою очередь, ускоряет разработку отчета, и бизнес быстрее получает необходимые ему наглядные визуализации данных.

Подобная автоматизация позволяет нескольким инженерам выполнить несколько поставок данных в течение одного спринта и обеспечить работой BI-разработчиков на несколько спринтов вперед.

Для демонстрации работы мы выбрали небольшой якорь с тремя атрибутами и одной связью. Конечно, в реальной работе будут встречаться якоря с гораздо большим количеством атрибутов и связей, но принципиально это ничего не меняет.

Мы будем и дальше улучшать работу модуля anchor_model.py, о чём обязательно расскажем в продолжении цикла статей о разработке BI-системы. Оставайтесь с нами, пишите своё мнение, советы и критику в комментариях. До скорых встреч!


ссылка на оригинал статьи https://habr.com/ru/articles/849938/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *