Автоматизация процессов в DWH с помощью Python и Snowflake

от автора

Привет, Хабр!

Сегодня рассмотрим тему автоматизации процессов в хранилищах данных с помощью мощного тандема — Snowflake и Python. Разберем, как с помощью Python можно легко подключаться к Snowflake, загружать данные, управлять таблицами и автоматизировать повседневные задачи.

Настройка среды

Создаем виртуальное окружение для проекта, чтобы изолировать зависимости:

# Для Windows python -m venv venv venv\Scripts\activate  # Для macOS/Linux python3 -m venv venv source venv/bin/activate

Устанавливаем официальный коннектор Snowflake для Python с помощью pip:

pip install snowflake-connector-python

Затем настраиваем подключение к базе данных Snowflake:

import snowflake.connector  conn = snowflake.connector.connect(     user='YOUR_USERNAME',     password='YOUR_PASSWORD',     account='YOUR_ACCOUNT_IDENTIFIER',     warehouse='YOUR_WAREHOUSE',     database='YOUR_DATABASE',     schema='YOUR_SCHEMA' )  # курсор для выполнения операций cursor = conn.cursor()

Помимо коннектора Snowflake, могут понадобиться следующие библиотеки:

  • pandas;

  • sqlalchemy и snowflake-sqlalchemy: для использования ORM и облегчения взаимодействия с БД;

  • python-dotenv: для управления переменными окружения и безопасного хранения учетных данных;

  • schedule или APScheduler: для планирования и автоматизации задач.

Теперь создаем файл .env и добавляем в него учетные данные:

SNOWFLAKE_USER=YOUR_USERNAME SNOWFLAKE_PASSWORD=YOUR_PASSWORD SNOWFLAKE_ACCOUNT=YOUR_ACCOUNT_IDENTIFIER

Затем в скрипте загружаем эти переменные:

import os from dotenv import load_dotenv  load_dotenv()  user = os.getenv('SNOWFLAKE_USER') password = os.getenv('SNOWFLAKE_PASSWORD') account = os.getenv('SNOWFLAKE_ACCOUNT')  #  переменные для подключения conn = snowflake.connector.connect(     user=user,     password=password,     account=account )

Синтаксис Python для работы с Snowflake

После настройки окружения и установки необходимых библиотек, можно приступать к работе с Snowflake через Python. Основным инструментом для этого является Snowflake Connector for Python.

Пример подключения и выполнения простого запроса:

import snowflake.connector  # соединение conn = snowflake.connector.connect(     user='YOUR_USERNAME',     password='YOUR_PASSWORD',     account='YOUR_ACCOUNT_IDENTIFIER' )  # курсор cursor = conn.cursor()  # SQL-запрос cursor.execute("SELECT CURRENT_VERSION()")  # результат version = cursor.fetchone() print(f"Текущая версия Snowflake: {version[0]}")  # закрытие соединения cursor.close() conn.close()

В этом примере подключаемся к Snowflake, выполняем запрос для получения текущей версии базы данных и выводим результат.

При работе с базами данных важно правильно управлять транзакциями и обрабатывать возможные ошибки. В Snowflake Connector for Python управление транзакциями осуществляется с помощью методов commit() и rollback().

Пример управления транзакциями и обработки исключений:

try:     # начало транзакции     conn.cursor().execute("BEGIN")      # выполнение операций     cursor.execute("INSERT INTO employees (id, name) VALUES (1, 'Artem')")     cursor.execute("INSERT INTO employees (id, name) VALUES (2, 'Ivan')")      # подтверждение транзакции     conn.cursor().execute("COMMIT") except Exception as e:     # откат транзакции в случае ошибки     conn.cursor().execute("ROLLBACK")     print(f"Ошибка: {e}") finally:     cursor.close()     conn.close()

В этом коде, если во время выполнения операций произойдет ошибка, транзакция будет откатана, и данные не будут сохранены.

Загрузка данных может быть выполнена с помощью команд PUT и COPY INTO, либо с использованием pandas.

Пример загрузки данных из CSV-файла с помощью COPY INTO:

# загрузка файла в внутренний stage cursor.execute("""     PUT file://path/to/data.csv @%your_table """)  # копирование данных из файла в таблицу cursor.execute("""     COPY INTO your_table     FROM @%your_table/data.csv     FILE_FORMAT = (TYPE = 'CSV', FIELD_DELIMITER = ',', SKIP_HEADER = 1) """)

Можно выгрузить данные из таблицы в локальный файл или работать с ними непосредственно в pandas.

Пример выгрузки данных в DataFrame:

import pandas as pd  # запрос и получение данных в DataFrame df = pd.read_sql("SELECT * FROM your_table", conn)

Используя pandas, можно выполнять различные преобразования данных перед их загрузкой или после выгрузки.

Пример преобразования данных перед загрузкой:

# Чтение данных из CSV df = pd.read_csv('path/to/data.csv')  # Преобразование данных df['total_price'] = df['quantity'] * df['unit_price']  # Загрузка данных в Snowflake from sqlalchemy import create_engine engine = create_engine('snowflake://...')  df.to_sql('your_table', engine, if_exists='append', index=False)

5 сценариев применения

Автоматическая ежедневная загрузка данных

Допустим, нужно ежедневно загружать новые данные из локального CSV-файла в таблицу Snowflake:

def update_aggregates():     conn = snowflake.connector.connect(...)     cursor = conn.cursor()      # Обновление агрегатов     cursor.execute("""         INSERT INTO hourly_aggregates         SELECT CURRENT_TIMESTAMP, COUNT(*)         FROM transactions         WHERE transaction_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP)     """)      conn.commit()     cursor.close()     conn.close()     print("Агрегированные данные обновлены.")  # Планирование задачи на каждый час schedule.every().hour.do(update_aggregates)

Обновление агрегированных данных

Необходимость в ежечасном обновление агрегированные данные для отчетности:

def clean_old_data():     conn = snowflake.connector.connect(...)     cursor = conn.cursor()      # Удаление старых записей     cursor.execute("""         DELETE FROM user_activity         WHERE activity_date < DATEADD(year, -1, CURRENT_DATE)     """)      conn.commit()     cursor.close()     conn.close()     print("Старые данные удалены.")  # Планирование задачи на каждое воскресенье в 03:00 schedule.every().sunday.at("03:00").do(clean_old_data)

Мониторинг и уведомления об ошибках

Сценарий: Отслеживать ошибки в процессе ETL и отправлять уведомления ответственному лицу.

Код:

def backup_critical_tables():     conn = snowflake.connector.connect(...)     cursor = conn.cursor()      critical_tables = ['customers', 'orders']     for table in critical_tables:         backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"         cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")      conn.commit()     cursor.close()     conn.close()     print("Резервные копии созданы.")  # Планирование задачи на первое число каждого месяца в 01:00 schedule.every().month.at("01:00").do(backup_critical_tables)

Очистка устаревших данных

Допустим, нужно еженедельно удалять данные старше одного года для оптимизации хранилища:

def backup_critical_tables():     conn = snowflake.connector.connect(...)     cursor = conn.cursor()      critical_tables = ['customers', 'orders']     for table in critical_tables:         backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"         cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")      conn.commit()     cursor.close()     conn.close()     print("Резервные копии созданы.")  # Планирование задачи на первое число каждого месяца в 01:00 schedule.every().month.at("01:00").do(backup_critical_tables)

Автоматическое резервное копирование данных

Сценарий: Ежемесячно создавать резервные копии критически важных таблиц.

Код:

def backup_critical_tables():     conn = snowflake.connector.connect(...)     cursor = conn.cursor()      critical_tables = ['customers', 'orders']     for table in critical_tables:         backup_table = f"{table}_backup_{time.strftime('%Y%m%d')}"         cursor.execute(f"CREATE TABLE {backup_table} CLONE {table}")      conn.commit()     cursor.close()     conn.close()     print("Резервные копии созданы.")  # Планирование задачи на первое число каждого месяца в 01:00 schedule.every().month.at("01:00").do(backup_critical_tables)

Заключение

Автоматизация процессов в Snowflake с помощью Python открывает широкие возможности для оптимизации работы с хранилищем данных.


Напоминаю про открытый урок «Эффективный анализ данных: Погружение в мир DWH и аналитической инженерии», который пройдет в Otus 23 сентября. На этом уроке участники узнают:

  • Основы DWH (Data Warehouse): Понимание архитектуры и ключевых компонентов хранилищ данных, а также их роли в бизнес-аналитике.

  • Инструменты и технологии: Обзор современных инструментов для работы с DWH, таких как ETL-процессы, BI-платформы и языки запросов (SQL).

  • Практические кейсы: Разбор реальных примеров использования DWH для принятия обоснованных бизнес-решений и оптимизации процессов.

Записаться можно на странице курса «Data Warehouse Analyst».


ссылка на оригинал статьи https://habr.com/ru/articles/843522/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *