Привет, друзья!
Продолжаем нашу серию статей о создании BI-системы в компании Sminex. В первой части мы рассказали, что в качестве основного места хранения аналитических данных используется хранилище с якорной моделью. Она характеризуется высокой нормализацией и строгими правилами наименования объектов.
Строгие правила наименования лучше сразу прописать в соглашении об именовании объектов. В нашем случае мы использовали базовое соглашение [Anchor Modeling: Naming Convention] с небольшими доработками и упрощениями.
Однако, чтобы добиться высокой нормализации, каждый объект, независимо от его типа, нужно выделять в отдельную таблицу, прописывать для него DDL и ключи. Учитывая количество объектов, это превращается в огромную рутинную работу. При этом нельзя допускать ситуации «сейчас быстро сделаем, а потом поправим». Это идеальная область для автоматизации в условиях ограничения ресурсов.
Важно!
Мы не претендуем на абсолютную правильность реализации, оптимальность и соответствие нашего решения всем общепринятым правилам. Конкретно в нашей ситуации, с нашими ресурсами и объёмом задач от заказчиков, это решение удалось реализовать достаточно быстро, и оно начало приносить свои плоды. Мы будем рады услышать конструктивную критику по поводу решения. Самое полезное опубликуем в следующей статье!
Шаг 1. Как облегчить работу инженерам по выполнению поставок данных
Необходимо:
-
Придумать
велосипедменее затратный способ укладки данных в якорную модель по сравнению с ручным созданием и настройкой таблиц. -
Разработать способ автоматической генерации однотипных SQL-запросов, исключающий ошибки в наименовании объектов и в логике работы с данными.
-
Сохранять логи о совершенных действиях.
Шаг 2. ООП нам поможет
Для реализации задуманного мы разработали Python-модуль anchor_model.py
, в котором создали классы и наделили их необходимыми методами. Давайте познакомимся с ними поближе.
Класс AnchorModel
Этот класс является суперклассом. Все остальные классы наследуются от него. Он содержит три метода:
-
exists
— проверяет, существует ли уже такая таблица в DWH. -
get_description
— возвращает описание таблицы. -
log
— записывает в логи создание или наполнение таблицы якорной модели.
class AnchorModel:
class AnchorModel: """ Суперкласс для работы с таблицами якорной модели Все остальные классы наследуются от него Методы: [exists] проверяет, существует ли уже такая таблица в DWH [get_description] возвращает описание таблицы (комментарий) [log] логирует обновление данных """ def __init__(self): # Подключение к DWH self.engine_dwh = get_engine('dwh') # Название таблицы self.table_name = None # Источник данных self.source = None # Тип данных (задается через create или get_data_type) self.data_type = None # Описание (задается через create или get_description) self.description = None # Тип данных поля инкремента self.increment = None # Переменные для логирования self.log_string = '' self.log_rowcount = 0 self.is_success = False self.exception = '' def exists(self): """ Проверка на то, что такая таблица уже существует в DWH """ try: query = f""" SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_name = '{self.table_name}' ); """ exists = self.engine_dwh.execute(query).scalar() if exists: logging.info(f"{self.log_string} Таблица [{self.table_name}] уже существует") return True else: logging.info(f"{self.log_string} Таблицы [{self.table_name}] не существует") return False except: raise finally: # Закрытие подключения self.engine_dwh.dispose() def get_description(self): """ Возвращает описание (комментарий к таблице) """ try: query = f""" SELECT d.description AS table_comment FROM pg_class c JOIN pg_description d ON c.oid = d.objoid AND d.objsubid = 0 /* уровень 0 = комментарии к таблицам, не к колонкам) */ WHERE c.relname = '{self.table_name}' """ description = self.engine_dwh.execute(query).scalar() if description is None: raise Exception(f'Не удалось получить описание таблицы {self.table_name}, ' f'проверь название и что у нее есть комментарий') else: self.description = description return self.description except: raise finally: # Закрытие подключения self.engine_dwh.dispose() def log(self): """ Логирует обновление данных """ try: query = f""" INSERT INTO _log (name, source, rows, is_success, exception) VALUES ('{self.table_name}', '{self.source}', {self.log_rowcount}, {self.is_success}, '{self.exception}') ;""" self.engine_dwh.execute(query) except: raise finally: # Закрытие подключения self.engine_dwh.dispose()
Класс Anchor
Класс Anchor
наследуется от AnchorModel
. При инициализации необходимо задать обязательный параметр name
— имя якоря. В классе Anchor
предусмотрены следующие методы:
-
create_anchor
— создает таблицу якоря в DWH. -
update_anchor
— обновляет данные в якоре. -
get_anchor_data_type
— возвращает тип данных бизнес-ключа существующего якоря. -
get_max_anchor_bk
— возвращает максимальный ID бизнес-ключа якоря.
class Anchor(AnchorModel):
class Anchor(AnchorModel): """ Класс для работы с якорем Атрибуты: [name] название якоря. При инициализации задается без префикса, напр. tender или document Методы: [create_anchor] создает таблицу в DWH с нужным префиксом и триггером для генерации суррогатного ключа [update_anchor] принимает название источника и SQL-запрос, обновляет данные в якоре [get_anchor_data_type] возвращает тип данных бизнес-ключа существующего якоря (напр. uuid) [get_max_anchor_bk] возвращает максимальное значение бизнес-ключа """ def __init__(self, name: str): super().__init__() # Название типа таблицы якорной модели self.type = 'anchor' # Название сущности self.name = name # Название таблицы = первые два символа названия + _ + название (напр. для tender - te_tender) self.table_name = self.name[:2] + '_' + self.name # Название временной таблицы self.tmp_table_name = self.table_name + '_tmp' # Название колонки суррогатного ключа = название + _ + id (напр. для tender - tender_id) self.id_column_name = self.name + '_id' # Название колонки бизнес ключа = название + _ + bk (напр. для tender - tender_bk) self.bk_column_name = self.name + '_bk' # Строка для отображения в логах консоли self.log_string = f'[Anchor] [{self.name}] ::' def create_anchor(self, data_type: str, description: str, increment: str = None): """ Функция для создания таблицы якоря в DWH [data_type] тип данных якоря [description] описание (комментарий к таблице) [increment] тип данных инкремента. Если указан: - заменяет колонку insert_ts и хранит в ней значения на момент обновления сущности - при загрузке новых данных кроме ключа якоря в запросе также должно фигурировать поле инкремента """ try: if self.exists(): return True self.data_type = data_type self.description = description self.increment = increment # Создаем таблицу якоря if self.increment: self.engine_dwh.execute(f""" CREATE TABLE {self.table_name} ( {self.id_column_name} uuid PRIMARY KEY, {self.bk_column_name} {self.data_type} NOT NULL UNIQUE, increment {self.increment} NOT NULL ); """) else: self.engine_dwh.execute(f""" CREATE TABLE {self.table_name} ( {self.id_column_name} uuid PRIMARY KEY, {self.bk_column_name} {self.data_type} NOT NULL UNIQUE, insert_ts timestamp with time zone DEFAULT now() ); """) # Добавляем комментарий self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';") # Создаем триггер для генерации суррогатного ключа при добавлении нового бизнес-ключа self.engine_dwh.execute(f""" CREATE OR REPLACE FUNCTION generate_{self.id_column_name}() RETURNS TRIGGER AS $$ BEGIN NEW.{self.id_column_name} := uuid_generate_v5(uuid_nil(), NEW.{self.bk_column_name}::text); RETURN NEW; END; $$ LANGUAGE plpgsql; """) # Триггер self.engine_dwh.execute(f""" CREATE TRIGGER {self.table_name}_trigger BEFORE INSERT ON {self.table_name} FOR EACH ROW EXECUTE PROCEDURE generate_{self.id_column_name}(); """) # Запись в реестр всех таблиц якорной модели. # Из нее потом генерируется документация и схема хранилища. self.engine_dwh.execute(f""" DELETE FROM _anchor_model WHERE table_name = '{self.table_name}'; INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name) VALUES ('{self.table_name}', 'anchor', '{self.name}', '{self.data_type}', '{self.name}') ;""") logging.info( f"{self.log_string} Таблица [{self.table_name}] создана с триггером на суррогатный ключ {self.id_column_name}") except: raise finally: # Закрытие подключения self.engine_dwh.dispose() def update_anchor(self, source: str, query: str, increment: bool = False): """ Загрузка данных якоря из источника [source] название БД (как в Airflow Connections) [query] запрос для выгрузки нужных данных """ try: self.data_type = self.get_anchor_data_type() self.description = self.get_description() self.source = source # Загрузка и очистка данных raw_data = sql(db=source, query=query, description=self.description, as_df=True) data = purify_data(data=raw_data) # Если после очистки данных не осталось, то завершаем работу # Логируем в блоке finally if data.empty: self.is_success = True return True # Начинаем загрузку данных в якорь # Создаем временную таблицу self.engine_dwh.execute(f""" DROP TABLE IF EXISTS {self.tmp_table_name};""") if increment: self.engine_dwh.execute(f""" CREATE TABLE {self.tmp_table_name} ( {self.bk_column_name} {self.data_type} NOT NULL PRIMARY KEY );""") else: self.engine_dwh.execute(f""" CREATE TABLE {self.tmp_table_name} ( {self.bk_column_name} {self.data_type} NOT NULL PRIMARY KEY );""") logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]') # Загрузка данных во временную таблицу data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False, method='multi', chunksize=1000) logging.info(f'Во временную таблицу загружено [{data.shape[0]:_}] строк') # Наполнение якоря новыми данными result = self.engine_dwh.execute(f""" WITH new_values AS ( SELECT tmp.{self.bk_column_name} FROM {self.tmp_table_name} tmp LEFT JOIN {self.table_name} anchor ON anchor.{self.bk_column_name} = tmp.{self.bk_column_name} WHERE anchor.{self.bk_column_name} IS NULL ) INSERT INTO {self.table_name} ({self.bk_column_name}) SELECT {self.bk_column_name} FROM new_values ;""") self.log_rowcount = result.rowcount self.is_success = True logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружены в якорь') except Exception as e: self.exception = str(e).replace('\'', '\'\'') raise e finally: # Удаление временной таблицы self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};") logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена') # Логирование self.log() # Закрытие подключения self.engine_dwh.dispose() def get_anchor_data_type(self): """ Возвращает тип данных бизнес-ключа якоря """ query = f""" SELECT data_type FROM information_schema.columns WHERE table_name = '{self.table_name}' AND column_name = '{self.bk_column_name}'; """ data_type = self.engine_dwh.execute(query).scalar() if data_type is None: raise Exception('Не удалось получить тип данных бизнес-ключа якоря, проверь название якоря') else: self.data_type = data_type return data_type def get_max_anchor_bk(self): """ Возвращает максимальное значение бизнес-ключа якоря """ try: query = f"SELECT MAX({self.bk_column_name}) FROM {self.table_name}" max_bk = self.engine_dwh.execute(query).scalar() return max_bk except Exception as e: raise e finally: self.engine_dwh.dispose()
Класс AnchorAttribute
Класс AnchorAttribute
наследуется от AnchorModel
. При инициализации необходимо задать значения обязательных параметров:
-
anchor_name
— имя якоря, которому принадлежит атрибут. -
name
— имя атрибута якоря без префиксов. -
is_historized
— признак исторического атрибута.
class AnchorAttribute(AnchorModel):
class AnchorAttribute(AnchorModel): """ Класс для работы с атрибутами якоря Атрибуты: [anchor_name] название якоря, к которому относится атрибут (напр. tender) [name] название атрибута якоря без префиксов (напр. name) [is_historized] признак исторического атрибута (в таблицу добавляется поле "valid_from", а в датасете должна быть дата изменения атрибута) Методы: [create_attribute] создает таблицу в DWH с нужным названием (напр. te_nam_tender_name) [update_attribute] принимает название источника и SQL-запрос, обновляет данные в атрибуте [get_attribute_data_type] возвращает тип данных атрибута (напр. text) [get_attribute_last_date] получение максимальной даты valid_from для итеративной загрузки исторических атрибутов [get_max_anchor_bk] получение максимального ID бизнес-ключа якоря """ def __init__(self, anchor_name: str, name: str, is_historized: bool = False): super().__init__() # Название типа таблицы якорной модели self.type = 'attr' # Инициализация переменных атрибута self.name = name # Инициализация переменных якоря self.anchor = Anchor(name=anchor_name) self.anchor_name = self.anchor.name self.anchor_data_type = self.anchor.get_anchor_data_type() self.anchor_description = self.anchor.get_description() # Признак исторического атрибута self.is_historized = is_historized # Название таблицы атрибута = первые 2 буквы названия якоря + _ + первые 3 буквы названия атрибута + _ + # название якоря + _ + название атрибута self.table_name = (self.anchor_name[:2] + '_' + self.name.replace('_', '')[:3] + '_' + self.anchor_name + '_' + self.name) # Если атрибут исторический, то добавляем _his if self.is_historized: self.table_name += '_his' # Название временной таблицы self.tmp_table_name = self.table_name + '_tmp' # Строка для отображения в логах self.log_string = f'[AnchorAttribute] [{self.name}] ::' def create_attribute(self, data_type: str, description: str): self.data_type = data_type self.description = description try: if self.exists(): return True # Формируем DDL таблицы в зависимости от того, исторический атрибут или нет if self.is_historized: query_ddl = f""" CREATE TABLE {self.table_name} ( {self.anchor.id_column_name} uuid NOT NULL, {self.name} {self.data_type} NOT NULL, valid_from timestamp NOT NULL, valid_to timestamp, insert_ts timestamp with time zone DEFAULT now(), /* UTC */ CONSTRAINT pk_{self.table_name} PRIMARY KEY ({self.anchor.id_column_name}, valid_from), CONSTRAINT fk_{self.table_name}_{self.anchor.id_column_name} FOREIGN KEY ({self.anchor.id_column_name}) REFERENCES {self.anchor.table_name} ({self.anchor.id_column_name}) ); """ else: query_ddl = f""" CREATE TABLE {self.table_name} ( {self.anchor.id_column_name} uuid NOT NULL, {self.name} {self.data_type} NOT NULL, insert_ts timestamp with time zone DEFAULT now(), /* UTC */ CONSTRAINT pk_{self.table_name} PRIMARY KEY ({self.anchor.id_column_name}), CONSTRAINT fk_{self.table_name}_{self.anchor.id_column_name} FOREIGN KEY ({self.anchor.id_column_name}) REFERENCES {self.anchor.table_name} ({self.anchor.id_column_name}) ); """ self.engine_dwh.execute(query_ddl) # Добавляем комментарий self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';") # Запись в таблицу со всеми таблицами якорной модели self.engine_dwh.execute(f""" DELETE FROM _anchor_model WHERE table_name = '{self.table_name}'; INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name, is_historized) VALUES ('{self.table_name}', 'attr', '{self.name}', '{self.data_type}', '{self.anchor_name}', {self.is_historized}); """) logging.info(f"{self.log_string} Таблица [{self.table_name}] создана") except: raise finally: # Закрытие подключения self.engine_dwh.dispose() def update_attribute(self, source: str, query: str, full_reload: bool = False): """ Загрузка данных атрибута якоря из источника Пока обычные атрибуты заливаются путем TRUNCATE-INSERT, а исторические атрибуты заливаются инкрементально [source] название БД из функции get_engine [query] SQL-запрос для выгрузки данных [full_reload] если поставить True, то атрибут обновится через TRUNCATE """ try: self.data_type = self.get_attribute_data_type() self.description = self.get_description() self.source = source # Загрузка и очистка данных raw_data = sql(db=source, query=query, description=self.description, as_df=True) data = purify_data(raw_data) # Если после очистки данных не осталось, то завершаем работу if data.empty: self.is_success = True return True # Создаем временную таблицу if self.is_historized: query_ddl_tmp = f""" CREATE TABLE {self.tmp_table_name} ( {self.anchor.bk_column_name} {self.anchor.data_type} NOT NULL, {self.name} {self.data_type} NOT NULL, valid_from timestamp NOT NULL, valid_to timestamp ); """ else: query_ddl_tmp = f""" CREATE TABLE {self.tmp_table_name} ( {self.anchor.bk_column_name} {self.anchor.data_type} NOT NULL, {self.name} {self.data_type} NOT NULL ); """ self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};") self.engine_dwh.execute(query_ddl_tmp) logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]') # Загрузка данных во временную таблицу data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False, method='multi', chunksize=1_000) logging.info(f'{self.log_string} Во временную таблицу загружено [{data.shape[0]:_}] строк') # Добавление новых значений в якорь result = self.engine_dwh.execute(f""" WITH new_anchor_values AS ( SELECT tmp.{self.anchor.bk_column_name} FROM {self.tmp_table_name} tmp LEFT JOIN {self.anchor.table_name} anchor ON tmp.{self.anchor.bk_column_name} = anchor.{self.anchor.bk_column_name} WHERE anchor.{self.anchor.bk_column_name} IS NULL ) INSERT INTO {self.anchor.table_name} ({self.anchor.bk_column_name}) SELECT DISTINCT {self.anchor.bk_column_name} FROM new_anchor_values ;""") logging.info(f'{self.log_string} [{result.rowcount:_}] новых строк загружены в якорь') # Добавление новых значений в атрибут if full_reload: self.engine_dwh.execute(f"TRUNCATE {self.table_name};") # Если атрибут исторический, то добавляем новые значения if self.is_historized: result = self.engine_dwh.execute(f""" MERGE INTO {self.table_name} AS target USING {self.tmp_table_name} AS source ON target.{self.anchor.id_column_name} = uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text) AND target.valid_from = source.valid_from WHEN MATCHED THEN UPDATE SET {self.name} = source.{self.name}, valid_from = source.valid_from WHEN NOT MATCHED THEN INSERT ({self.anchor.id_column_name}, {self.name}, valid_from) VALUES (uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text), source.{self.name}, source.valid_from) ;""") # Если атрибут не исторический, то обновляем старые значения и записываем новые (если есть) else: result = self.engine_dwh.execute(f""" MERGE INTO {self.table_name} AS target USING {self.tmp_table_name} AS source ON target.{self.anchor.id_column_name} = uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text) WHEN MATCHED THEN UPDATE SET {self.name} = source.{self.name} WHEN NOT MATCHED THEN INSERT ({self.anchor.id_column_name}, {self.name}) VALUES (uuid_generate_v5(uuid_nil(), source.{self.anchor.bk_column_name}::text), source.{self.name}) ;""") self.is_success = True self.log_rowcount = result.rowcount logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружено в атрибут') except Exception as e: self.exception = str(e).replace('\'', '\'\'') raise e finally: # Удаление временной таблицы self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};") logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена') # Закрытие подключения self.engine_dwh.dispose() # Запись в лог self.log() def get_attribute_data_type(self): """ Возвращает тип данных атрибута """ if not self.exists(): raise Exception('Атрибута не существует, сначала надо выполнить create_attribute') query = f""" SELECT data_type FROM information_schema.columns WHERE table_name = '{self.table_name}' AND column_name = '{self.name}'; """ data_type = self.engine_dwh.execute(query).scalar() if data_type is None: raise Exception('Не удалось получить тип данных атрибута, проверь название атрибута') else: self.data_type = data_type return data_type def get_attribute_last_date(self): """ Функция для получения максимальной даты valid_from для итеративной загрузки исторических атрибутов """ if self.is_historized: query = f"SELECT MAX(valid_from) FROM {self.table_name}" last_date = sql(db=self.engine_dwh, query=query, description='Последняя записанная в атрибут дата', as_df=True).iloc[0][0] if last_date is None: last_date = datetime.now() - relativedelta(months=12) return last_date else: raise Exception(f'Нельзя получить последнюю дату данных для неисторического атрибута [{self.name}]') def get_max_anchor_bk(self): """ Получение максимального ID бизнес-ключа якоря. Используется для инкрементальной загрузки данных некоторых сущностей (например логов и транзакций, где есть инкрементальный id строки) """ try: query = f""" SELECT MAX(anchor.{self.anchor.bk_column_name}) FROM {self.anchor.table_name} anchor JOIN {self.table_name} attr ON anchor.{self.anchor.id_column_name} = attr.{self.anchor.id_column_name} """ max_bk = self.engine_dwh.execute(query).scalar() return max_bk except Exception as e: raise e finally: self.engine_dwh.dispose()
Особое внимание стоит уделить параметру is_historized
. Он определяет дальнейшую судьбу атрибута, а именно: нужно ли сохранять его историю изменения или нет.
-
Если атрибут не является историческим, мы всегда будем видеть только его актуальное состояние. При изменении его значения в системе-источнике актуальное значение будет заменять имеющееся в DWH.
-
Если атрибут является историческим, при создании у него появится дополнительное поле
valid_from
, и при изменении значения атрибута в системе-источнике в таблице атрибута в DWH появится новая строка с новым значениемvalid_from
.
Такой подход позволяет не засорять хранилище ненужными данными, а также сокращает количество операций и экономит ресурсы.
Методы класса AnchorAttribute
:
-
create_attribute
— создает таблицу атрибута в DWH. -
update_attribute
— обновляет данные в таблице атрибута. -
get_attribute_data_type
— возвращает тип данных атрибута. -
get_attribute_last_date
— возвращает максимальное значение поляvalid_from
для итеративной загрузки исторических атрибутов. -
get_max_anchor_bk
— возвращает максимальный ID бизнес-ключа якоря.
Класс Tie
Класс Tie
наследуется от AnchorModel
. При инициализации необходимо задать значения двух обязательных параметров — anchor1
и anchor2
— имена якорей, находящихся в связи.
Методы класса Tie
:
-
create_tie
— создает таблицу связи в DWH. -
update_tie
— обновляет данные в таблице связи. -
get_max_anchor_bk
— возвращает максимальный ID бизнес-ключа якоря.
class Tie(AnchorModel):
class Tie(AnchorModel): """ Класс для работы с таблицами связей между якорями Атрибуты: [anchor1_name] имя первого якоря [anchor2_name] имя второго якоря Методы: [create_tie] создает таблицу в DWH для связи (напр. te_re_tender_request) [update_tie] принимает название источника и SQL-запрос, обновляет данные в связи [get_max_anchor_bk] получение максимального ID бизнес-ключа якоря """ def __init__(self, anchor1_name: str, anchor2_name: str): super().__init__() # Название типа таблицы якорной модели self.type = 'tie' # Создаем объекты якорей, чтобы из них получать нужные переменные self.anchor1 = Anchor(name=anchor1_name) self.anchor2 = Anchor(name=anchor2_name) # Описание self.description = f'Связь сущности "{self.anchor1.get_description()}" с сущностью "{self.anchor2.get_description()}"' # Название таблицы связи = первые 2 буквы первого якоря + _ + первые 2 буквы второго якоря + _ + # название первого якоря + _ + название второго якоря self.table_name = anchor1_name[:2] + '_' + anchor2_name[:2] + '_' + anchor1_name + '_' + anchor2_name # Название временной таблицы self.tmp_table_name = self.table_name + '_tmp' # Строка для отображения в логах self.log_string = f'[Tie] :: [{self.table_name}]' def create_tie(self): """ Создание таблицы связи якорей (если такой еще нет) """ try: if self.exists(): return True # Формируем DDL таблицы query_ddl = f""" CREATE TABLE {self.table_name} ( {self.anchor1.id_column_name} uuid NOT NULL, {self.anchor2.id_column_name} uuid NOT NULL, insert_ts timestamp with time zone DEFAULT now(), CONSTRAINT fk_{self.table_name}_{self.anchor1.id_column_name} FOREIGN KEY ({self.anchor1.id_column_name}) REFERENCES {self.anchor1.table_name} ({self.anchor1.id_column_name}), CONSTRAINT fk_{self.table_name}_{self.anchor2.id_column_name} FOREIGN KEY ({self.anchor2.id_column_name}) REFERENCES {self.anchor2.table_name} ({self.anchor2.id_column_name}) ); CREATE INDEX {self.table_name}_{self.anchor1.id_column_name}_index ON {self.table_name} ({self.anchor1.id_column_name}); CREATE INDEX {self.table_name}_{self.anchor2.id_column_name}_index ON {self.table_name} ({self.anchor2.id_column_name}); """ self.engine_dwh.execute(query_ddl) # Добавляем комментарий self.engine_dwh.execute(f"COMMENT ON TABLE {self.table_name} IS '{self.description}';") # Запись в таблицу со всеми таблицами якорной модели self.engine_dwh.execute(f""" DELETE FROM _anchor_model WHERE table_name = '{self.table_name}'; INSERT INTO _anchor_model (table_name, type, name, data_type, anchor_name) VALUES ('{self.table_name}', 'tie', '{self.anchor2.name}', NULL, '{self.anchor1.name}') """) logging.info(f"{self.log_string} Таблица [{self.table_name}] создана") except: raise finally: # Закрытие подключения self.engine_dwh.dispose() def update_tie(self, source: str, query:str, full_reload: bool = False): """ Загрузка данных в таблицу связей (Tie) Данные для таблицы связей мы получаем в виде бизнес-ключей двух якорей """ try: self.create_tie() self.source = source # Загрузка и очистка данных raw_data = sql(db=source, query=query, description=self.description, as_df=True) data = purify_data(raw_data) # Если после очистки данных не осталось, то завершаем работу if data.empty: self.is_success = True return True # Создаем временную таблицу self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};") self.engine_dwh.execute(f""" CREATE TABLE {self.tmp_table_name} ( {self.anchor1.bk_column_name} {self.anchor1.get_anchor_data_type()} NOT NULL, {self.anchor2.bk_column_name} {self.anchor2.get_anchor_data_type()} NOT NULL ); """) logging.info(f'{self.log_string} Создана временная таблица [{self.tmp_table_name}]') # Загрузка данных во временную таблицу data.to_sql(name=self.tmp_table_name, con=self.engine_dwh, if_exists='append', index=False, method='multi', chunksize=1000) logging.info(f'{self.log_string} [{data.shape[0]:_}] строк загружено во временную таблицу') # Добавление новых значений в якорь result = self.engine_dwh.execute(f""" WITH new_anchor_values AS ( SELECT tmp.{self.anchor2.bk_column_name} FROM {self.tmp_table_name} tmp LEFT JOIN {self.anchor2.table_name} anchor ON tmp.{self.anchor2.bk_column_name} = anchor.{self.anchor2.bk_column_name} WHERE anchor.{self.anchor2.bk_column_name} IS NULL ) INSERT INTO {self.anchor2.table_name} ({self.anchor2.bk_column_name}) SELECT DISTINCT {self.anchor2.bk_column_name} FROM new_anchor_values ;""") logging.info(f'{self.log_string} [{result.rowcount:_}] новых строк загружены в якорь') # Наполнение новыми данными # Если хотим полностью обновить связи, то делаем сначала TRUNCATE if full_reload: query = f"TRUNCATE TABLE {self.table_name};" self.engine_dwh.execute(query) query = f""" WITH new_values AS ( SELECT anc1.{self.anchor1.id_column_name}, anc2.{self.anchor2.id_column_name} FROM {self.tmp_table_name} tmp JOIN {self.anchor1.table_name} anc1 ON tmp.{self.anchor1.bk_column_name} = anc1.{self.anchor1.bk_column_name} JOIN {self.anchor2.table_name} anc2 ON tmp.{self.anchor2.bk_column_name} = anc2.{self.anchor2.bk_column_name} LEFT JOIN {self.table_name} tie ON anc1.{self.anchor1.id_column_name} = tie.{self.anchor1.id_column_name} AND anc2.{self.anchor2.id_column_name} = tie.{self.anchor2.id_column_name} WHERE tie.{self.anchor1.id_column_name} IS NULL OR tie.{self.anchor2.id_column_name} IS NULL ) INSERT INTO {self.table_name} ({self.anchor1.id_column_name}, {self.anchor2.id_column_name}) SELECT {self.anchor1.id_column_name}, {self.anchor2.id_column_name} FROM new_values; """ result = self.engine_dwh.execute(query) self.is_success = True self.log_rowcount = result.rowcount logging.info(f'{self.log_string} [{self.log_rowcount:_}] новых строк загружены в связь') except Exception as e: self.is_success = False self.exception = str(e).replace('\'', '\'\'') raise e finally: # Удаление временной таблицы self.engine_dwh.execute(f"DROP TABLE IF EXISTS {self.tmp_table_name};") logging.info(f'{self.log_string} Временная таблица [{self.tmp_table_name}] удалена') # Закрытие подключения self.engine_dwh.dispose() # Запись в лог self.log() def get_max_anchor_bk(self): """ Получение максимального ID бизнес-ключа якоря. Используется для инкрементальной загрузки данных некоторых сущностей (например логов и транзакций, где есть инкрементальный id строки) """ try: query = f""" SELECT MAX(anchor.{self.anchor1.bk_column_name}) FROM {self.anchor1.table_name} anchor JOIN {self.table_name} tie ON anchor.{self.anchor1.id_column_name} = tie.{self.anchor1.id_column_name} """ max_bk = self.engine_dwh.execute(query).scalar() return max_bk except Exception as e: raise e finally: self.engine_dwh.dispose()
Шаг 3. Пробуем применять
Итак, представим задачу. Нам нужно реализовать поставку данных из Jira, а конкретно объекта Sprint, для отчета, демонстрирующего минимальный, медианный и максимальный Lead Time у разных команд разработки, а также его динамику по месяцам.
Пункт 1. Импортируем все наши классы, прописываем аргументы DAG и начинаем создавать задачи. Сначала инициализируем якорь jira_sprint
, прописываем создание якоря, базовый запрос и обновление якоря.
Пример DAG (загрузка якоря)
from airflow.models import DAG from airflow.decorators import task from datetime import timedelta from tools.smnx_data_tools import task_fail_alert from tools.anchor_model import Anchor, AnchorAttribute, Tie args = { 'owner': 'owner_name', 'start_date': '2024-09-04', 'depends_on_past': False, 'catchup': False, 'retries': 1, 'retry_delay': timedelta(minutes=30), 'on_failure_callback': task_fail_alert } @task def anc_jira_sprint(): query = """ SELECT "ID" AS jira_sprint_bk FROM "AO_60DB71_SPRINT" """ anchor = Anchor(name='jira_sprint') anchor.create_anchor(data_type='integer', description='Jira спринт') anchor.update_anchor(source='jira_db', query=query)
Пункт 2. Определяем атрибуты спринта. В нашем случае это название, дата начала и дата завершения. Прописываем базовый запрос, инициализируем атрибуты, создаем и обновляем их. Повторяем для каждого атрибута якоря.
Пример DAG (загрузка атрибутов якоря)
@task def attr_name(): query = """ SELECT "ID" AS jira_sprint_bk, "NAME" AS name FROM "AO_60DB71_SPRINT" """ anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='name') anchor_attribute.create_attribute(data_type='text', description='Название') anchor_attribute.update_attribute(source='jira_db', query=query) @task def attr_started(): query = """ SELECT "ID" AS jira_sprint_bk, to_timestamp("START_DATE"/1000) AS started FROM "AO_60DB71_SPRINT" """ anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='started') anchor_attribute.create_attribute(data_type='timestamp with time zone', description='') anchor_attribute.update_attribute(source='jira_db', query=query) @task def attr_completed(): query = """ SELECT "ID" AS jira_sprint_bk, to_timestamp("COMPLETE_DATE"/1000) AS completed FROM "AO_60DB71_SPRINT" """ anchor_attribute = AnchorAttribute(anchor_name='jira_sprint', name='completed') anchor_attribute.create_attribute(data_type='', description='') anchor_attribute.update_attribute(source='jira_db', query=query)
Пункт 3. Определяем связи с другими якорями. В нашем случае jira_sprint
имеет связь с jira_issue
, так как нам важно понимать, к какому спринту относится задача. Прописываем базовый запрос, инициализируем связь, создаем и обновляем её.
Пример DAG (загрузка связи)
@task def tie_jira_issue(): query = """ SELECT "ID" AS jira_sprint_bk, i.id AS jira_issue_bk FROM "AO_60DB71_SPRINT" s JOIN customfieldvalue v ON v.customfield = '10101' /* sprint */ AND v.stringvalue = s."ID"::text JOIN jiraissue i ON v.issue = i.id """ tie = Tie(anchor1_name='jira_sprint', anchor2_name='jira_issue') tie.create_tie() tie.update_tie(source='jira_db', query=query)
Пункт 4. Оборачиваем полученные задачи в DAG и ставим на регулярный запуск.
Пример DAG (определение свойств DAG, установление последовательности задач)
with DAG(dag_id='am_jira_sprint', default_args=args, schedule_interval=timedelta(hours=1), tags=['AnchorModel'], max_active_runs=1) as dag: anc_jira_sprint() >> attr_name() >> attr_started() >> attr_completed() >> tie_jira_issue()
Шаг 4. Profit — подводим итоги
С помощью anchor_model.py
мы организовали поставку данных, необходимых для отчета, за пару часов и избежали ошибок в именах объектов. По сути, вся работа инженера свелась к определению имени якоря и его атрибутов, написанию нескольких строк кода и базовых запросов. Время разработки сокращено более чем на 50%, а количество ошибок уменьшилось более чем на 95% по сравнению с полностью ручной разработкой поставки данных.
Благодаря предсказуемости имен объектов в DWH, BI-разработчик может приступить к разработке витрин данных, необходимых для создания отчетов, сразу же после первой отработки DAG. Это, в свою очередь, ускоряет разработку отчета, и бизнес быстрее получает необходимые ему наглядные визуализации данных.
Подобная автоматизация позволяет нескольким инженерам выполнить несколько поставок данных в течение одного спринта и обеспечить работой BI-разработчиков на несколько спринтов вперед.
Для демонстрации работы мы выбрали небольшой якорь с тремя атрибутами и одной связью. Конечно, в реальной работе будут встречаться якоря с гораздо большим количеством атрибутов и связей, но принципиально это ничего не меняет.
Мы будем и дальше улучшать работу модуля anchor_model.py
, о чём обязательно расскажем в продолжении цикла статей о разработке BI-системы. Оставайтесь с нами, пишите своё мнение, советы и критику в комментариях. До скорых встреч!
ссылка на оригинал статьи https://habr.com/ru/articles/849938/
Добавить комментарий