ETL-проект для начинающих Data Engineers: От почтового сервера до Greenplum

от автора

Привет, Хабр!
Меня зовут Дмитрий и я работаю инженером данных.

Это моя первая статья, в ней я хочу поделиться своим пет-проектом, который посвящен созданию ETL-процесса — это один из ключевых компонентов в работе любого Data Engineer. В моем случае проект направлен на извлечение данных из электронной почты, их преобразование и последующую загрузку в базу данных Greenplum для дальнейшего анализа и визуализации.

Идея создания этого проекта возникла у меня из реальной необходимости: я хотел лучше контролировать свои расходы в крупных продуктовых сетях, особенно в таких магазинах, как «ВкусВилл». Основная задача заключалась в том, чтобы систематизировать свои покупки по категориям товаров, анализировать эти данные и видеть динамику расходов. Конечно, существуют приложения, которые уже умеют строить подобные графики, и даже сам «ВкусВилл» предоставляет такую возможность в своем личном кабинете. Но моя цель была гораздо глубже — я хотел создать собственную систему, где данные из разных магазинов могли бы агрегироваться в одном месте. Это позволило бы не только анализировать информацию о расходах, но и использовать её для более сложных вычислений и визуализаций.

Кроме того, я всегда хотел получить все эти данные в удобном и кастомизированном формате, который был бы легко настраиваемым под свои нужды. Готовые приложения часто не дают возможности гибко изменять категории товаров или детализировать данные до нужного уровня. Поэтому в рамках этого проекта я решил использовать такие инструменты, как Python для автоматизации процессов извлечения и обработки данных, и Greenplum в качестве базы данных для их хранения и последующего анализа.

В этой статье я расскажу, как, используя Python и базу данных Greenplum, мне удалось автоматизировать процесс извлечения данных из писем от магазина «ВкусВилл», структурировать их и загрузить в базу для дальнейшей обработки. На этапе трансформации данных я разобрал, как можно извлечь ключевую информацию из писем (например, названия товаров, их количество, цену и общую стоимость заказа), а затем преобразовать эти данные в формат, удобный для дальнейшей аналитики.

Проект охватывает весь жизненный цикл данных, начиная с их извлечения из внешних источников (электронная почта), обработки и преобразования в удобный формат, и заканчивая их загрузкой в мощную аналитическую базу данных, где можно запускать сложные запросы и визуализировать результаты.

Итак, начнем.


Введение

Используемые инструменты:

  • Python для извлечения и обработки данных (IDEA PyCharm)

  • Greenplum для хранения данных (Клиент dbeaver -> БД Greenplum)

  • imaplib для подключения к почтовому серверу

  • pandas для работы с данными

  • psycopg2 для загрузки данных в Greenplum

Подготовительные шаги:

  1. Настройка почтового сервера для извлечения писем (Extract).

  2. Написание парсера для обработки данных из писем (Transform).

  3. Преобразование полученных данных в формате CSV (Transform).

  4. Загрузка данных в временную таблицу базы данных Greenplum (Load).


EXTRACT

Здесь происходит извлечение данных из почтового ящика. Подключаемся к почтовому серверу, извлекаем непрочитанные письма и сохраняем их для дальнейшей обработки.

1. Подключение к почтовому серверу

import imaplib import email from email.header import decode_header import base64 import mail_config  # Загрузка данных для подключения из конфигурационного файла mail_pass = mail_config.mail_pass  # Пароль почтового ящика username = mail_config.username  # Имя пользователя (логин) imap_server = "imap.mail.ru"  # Адрес почтового сервера  # Подключение к серверу через SSL (защищенное соединение) imap = imaplib.IMAP4_SSL(imap_server)  # Логинимся на почтовый сервер с помощью логина и пароля imap.login(username, mail_pass)  # Выбираем папку с входящими письмами для работы imap.select("INBOX")

2. Извлечение непрочитанных писем

# Поиск всех непрочитанных писем в почтовом ящике unseen_mails = imap.search(None, 'UNSEEN')    # Преобразуем результат поиска в строку для дальнейшей обработки unseen_mails_str = str(unseen_mails[1])  # Выводим список ID непрочитанных писем print('Непрочитанные письма: ', parsing_list_unseen_email(unseen_mails_str))

Пара комментариев по блоку выше:

  • Я использую команду imap.search для поиска всех непрочитанных писем с ключом ‘UNSEEN’.

  • Результат представляет собой строку с ID писем, которую можно обрабатывать с помощью функции parsing_list_unseen_email.

3. Парсинг непрочитанных писем

def parsing_list_unseen_email(unseen_emails: str):     """     Функция для получения списка ID непрочитанных писем.     :param unseen_emails: Строка с ID непрочитанных писем.     :return: Список ID писем.     """     lst_id_unseen_emails = []  # Пустой список для ID     digit_char = ''  # Переменная для временного хранения цифр     for char in unseen_emails:         # Если текущий символ — цифра, добавляем его к временной строке         if char.isdigit():             digit_char += char         else:             # Если цифры закончились, сохраняем их как ID и обнуляем временную строку             if digit_char:                 lst_id_unseen_emails.append(digit_char)                 digit_char = ''     return lst_id_unseen_emails  # Возвращаем итоговый список ID

Снова комментарии:

  • Функция обрабатывает строку с ID писем, разбивая ее на отдельные элементы (цифры) и возвращает список ID для дальнейшей работы.

  • Проходим по каждому символу строки, извлекая только цифры, представляющие ID писем.

  • Это нужно для того, чтобы знать, какие письма еще не были прочитаны

  • Важно: если вы программно открыли письмо, оно пропадет из папки UNSEEN

4. Извлечение конкретного письма

# Извлекаем одно из писем по его ID res, msg = imap.fetch(b'3123', '(RFC822)')  # Здесь 3123 — это ID письма msg = email.message_from_bytes(msg[0][1])  # Преобразуем байты в объект сообщения  # Читаем и выводим заголовок письма (например, тему) print('\nЗаголовок письма:\n', decode_header(msg["Subject"])[0][0].decode())
  • imap.fetch позволяет извлечь содержимое письма по его ID. То есть здесь можно явно указать, какое письмо прочитать.

  • Используется библиотека email для декодирования байтов в читабельное сообщение, а затем извлекается заголовок (например, тема письма). Нам нужно найти заголовок, который будет говорить, что письмо пришло из ВкусВилл.

5. Извлечение тела письма

def extract_multipart(msg):     """     Функция для извлечения содержимого письма с возможностью обработки вложенных сообщений.     :param msg: Объект сообщения.     """     with open('D:/Mail_read_files/email_body.txt', 'w', encoding='utf-8') as f:         # Если письмо многокомпонентное (содержит вложенные элементы)         if msg.is_multipart():             for part in msg.walk():  # Проходим по всем частям письма                 # Если часть письма — это текст (HTML-формат)                 if part.get_content_type() == 'text/html':                     # Раскодируем содержимое и записываем его в файл                     f.write(base64.b64decode(part.get_payload()).decode())         else:             # Если письмо не содержит вложенных частей, просто записываем его содержимое             f.write(base64.b64decode(msg.get_payload()).decode())     print('[INFO] Файл email_body.txt создан')
  • is_multipart() проверяет, есть ли в письме вложенные элементы. Это важно для обработки писем с несколькими частями. Очень сложный момент для меня лично был, поскольку такие письма — как матрешка (требуется несколько раз углубиться в структуру, чтобы добраться до тела письма).

  • Если письмо содержит HTML, то содержимое сохраняется в файл для дальнейшего анализа.

  • Мы используем base64.b64decode, так как содержимое писем часто закодировано.


TRANSFORM

После того как мы извлекли содержимое письма, переходим к его анализу. В проекте в этом примере я рассматриваю письма от магазина «Вкусвилл», содержащеи информацию о покупках.

1. Парсинг HTML-содержимого

import pandas as pd  # Инициализация пустых списков для хранения данных st = {'product_name': [], 'count': [], 'price': [], 'total_price': [], 'order_date': [], 'shop_name': []}  def transform_data_vkusvill():     """     Парсинг письма от магазина "Вкусвилл" для извлечения данных о покупках.     """     with open(r'D:/Mail_read_files/email_body.txt', 'r', encoding='utf-8') as f:         lines = f.readlines()  # Чтение всех строк файла         for i, line in enumerate(lines):             # Ищем строку с названием магазина             if 'АО "Вкусвилл"' in line:                 shop_name = 'АО "Вкусвилл"'  # Сохраняем название магазина                 # Дата заказа находится на 14-й строке после найденного названия                 order_date = lines[i + 14].split('<')[0].strip()             # Ищем строку с информацией о товаре             if 'width="40%"' in line:                 st['order_date'].append(order_date)  # Сохраняем дату заказа                 st['shop_name'].append(shop_name)  # Сохраняем название магазина                                  # Парсинг названия товара                 product_line = lines[i + 2]                 if ',кг' in product_line or ',шт' in product_line:                     product_name = product_line.split(',')[0].strip()  # Название товара                     st['product_name'].append(product_name)                                  # Парсинг цены, количества и общей суммы                 st['price'].append(float(lines[i + 4].replace(',', '.')))                 st['count'].append(float(lines[i + 6].replace(',', '.')))                 total_price = float(lines[i + 10].split('<')[0].replace(',', '.'))                 st['total_price'].append(total_price)  transform_data_vkusvill()  # Вызов функции для парсинга

Пара слов про блок:

  • Извлекаю данные о магазине, дате заказа, названии продукта, цене и количестве товара, используя специфические маркеры (например, ‘АО «Вкусвилл»‘, ‘width=»40%»‘). Думаю, что этих полей будет достаточно для дальнейшего анализа и построения визуализации.

  • Эти данные сохраняются в словарь st для дальнейшей обработки.

2. Преобразование в DataFrame и сохранение в CSV

# Преобразование данных в DataFrame df = pd.DataFrame(st)  # Сохранение DataFrame в CSV для дальнейшей загрузки df.to_csv('D:/Mail_read_files/email_body.csv', sep=';', encoding='utf-8', index=False)

LOAD

На финальном этапе я загружаем данные из CSV — файла в временную таблицу базы данных Greenplum.

1. Загрузка данных в базу данных

import psycopg2 import mail_config  def load_data_to_temp_table():     """     Загрузка данных из CSV в временную таблицу Greenplum.     """     answer = input('Точно загрузить новую пачку данных? Напишите "да" или "нет": ')          # Проверка на подтверждение действия пользователя     if answer.lower() == 'да':         # Подключаемся к базе данных Greenplum         with psycopg2.connect(             database=mail_config.db_name,             user=mail_config.user,             password=mail_config.password,             host=mail_config.host,             port=mail_config.port         ) as conn:             # Открываем курсор для выполнения SQL-запросов             with conn.cursor() as cur, open('D:/Mail_read_files/email_body.csv', 'r', encoding='utf-8') as file:                 # Загружаем данные из CSV в таблицу базы данных                 cur.copy_from(file, 'email_body_temp', sep=';')                 conn.commit()  # Подтверждаем транзакцию                 print("Данные успешно загружены")     else:         print("Загрузка отменена")  load_data_to_temp_table()  # Вызов функции для загрузки данных

Заключение

Это был мой первый пет-проект, в котором мне пришлось сразу использовать несколько ключевых инструментов, необходимых для работы Data Engineer. Я понимаю, что статья может показаться сложной, но не стоит пугаться. Попробуйте разобрать код построчно и проанализировать каждый шаг — это отличный способ лучше понять, как все работает. Обычно после такого подхода многие моменты становятся более ясными и понятными.

Я постарался снабдить код подробными комментариями, чтобы помочь вам лучше понять каждый шаг процесса. Надеюсь, это поможет вам в освоении подобных задач.

Этот проект — только первый шаг, базовый этап извлечения данных из почты. В следующей статье я расскажу, как автоматизировать процесс, поставив его на расписание, а также покажу, как подключить BI-инструменты для создания визуализаций, чтобы вы могли видеть свои данные в удобной и красивой форме.

Я веду свой блог в телеграм по data engineering. Пишу про область, рабочие кейсы, немного лайфстайла. Будет интересно больше новичкам в области, но также поддержу разговор с middle+ инженерами. Буду рад, если в моем канале найдете для себя полезности.

А тажке ссылка на репозиторий, где вы можете ознакомиться с моими материалами.


ссылка на оригинал статьи https://habr.com/ru/articles/849062/