Apache Beam и DataFlow для конвейеров реального времени

Настройка Google Cloud
Примечание: Для запуска конвейера и публикации данных пользовательского лога я использовал Google Cloud Shell, поскольку у меня возникли проблемы с запуском конвейера на Python 3. Google Cloud Shell использует Python 2, который лучше согласуется с Apache Beam.
Чтобы запустить конвейер, нам нужно немного покопаться в настройках. Тем из вас, кто раньше не пользовался GCP, необходимо выполнить следующие 6 шагов, приведенных на этой странице.
После этого нам нужно будет загрузить наши скрипты в облачное хранилище Google и скопировать их в нашу Google Cloud Shel. Загрузка в облачное хранилище достаточно тривиальна (описание можно найти здесь). Чтобы скопировать наши файлы, мы можем открыть Google Cloud Shel из панели инструментов, щелкнув первый значок слева на рисунке 2 ниже.

Рисунок 2
Команды, которые нам нужны для копирования файлов и установки необходимых библиотек, перечислены ниже.
# Copy file from cloud storage gsutil cp gs://<YOUR-BUCKET>/ * . sudo pip install apache-beam[gcp] oauth2client==3.0.0 sudo pip install -U pip sudo pip install Faker==1.0.2 # Environment variables BUCKET=<YOUR-BUCKET> PROJECT=<YOUR-PROJECT>
Создание нашей базы данных и таблицы
После того, как мы выполнили все шаги, связанные с настройкой, следующее, что нам нужно сделать, это создать набор данных и таблицу в BigQuery. Есть несколько способов сделать это, но самый простой — использовать консоль Google Cloud, сначала создав набор данных. Вы можете выполнить действия, указанные по следующей ссылке, чтобы создать таблицу со схемой. Наша таблица будет иметь 7 столбцов, соответствующих компонентам каждого пользовательского лога. Для удобства мы определим все столбцы как строки (тип string), за исключением переменной timelocal, и назовем их в соответствии с переменными, которые мы сгенерировали ранее. Схема нашей таблицы должна выглядеть как на рисунке 3.

Рисунок 3. Схема таблицы
Публикация данных пользовательского лога
Pub/Sub является критически важным компонентом нашего конвейера, поскольку позволяет нескольким независимым приложениям взаимодействовать друг с другом. В частности, он работает как посредник, позволяющий нам отправлять и получать сообщения между приложениями. Первое, что нам нужно сделать, это создать тему (topic). Достаточно просто перейти в Pub/Sub в консоли и нажать CREATE TOPIC.
Приведенный ниже код вызывает наш скрипт для генерации данных лога, определенных выше, а затем подключается и отправляет журналы в Pub/Sub. Единственное, что нам нужно сделать, — это создать объект PublisherClient, указать путь к теме с помощью метода topic_path и вызвать функцию publish с topic_path и данными. Обратите внимание, что мы импортируем generate_log_line из нашего скрипта stream_logs, поэтому убедитесь, что эти файлы находятся в одной папке, иначе вы получите ошибку импорта. Затем мы можем запустить это через нашу google-консоль, используя:
python publish.py
from stream_logs import generate_log_line import logging from google.cloud import pubsub_v1 import random import time PROJECT_ID="user-logs-237110" TOPIC = "userlogs" publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(PROJECT_ID, TOPIC) def publish(publisher, topic, message): data = message.encode('utf-8') return publisher.publish(topic_path, data = data) def callback(message_future): # When timeout is unspecified, the exception method waits indefinitely. if message_future.exception(timeout=30): print('Publishing message on {} threw an Exception {}.'.format( topic_name, message_future.exception())) else: print(message_future.result()) if __name__ == '__main__': while True: line = generate_log_line() print(line) message_future = publish(publisher, topic_path, line) message_future.add_done_callback(callback) sleep_time = random.choice(range(1, 3, 1)) time.sleep(sleep_time)
Как только файл запустится, мы сможем наблюдать вывод данных лога на консоль, как показано на рисунке ниже. Этот скрипт будет работать до тех пор, пока мы не используем CTRL+C, чтобы завершить его.

Рисунок 4. Вывод publish_logs.py
Написание кода нашего конвейера
Теперь, когда мы все подготовили, мы можем приступить к самой интересной части — написанию кода нашего конвейера, используя Beam и Python. Чтобы создать Beam-конвейер, нам нужно создать объект конвейера (p). После того как мы создали объект конвейера, мы можем применить несколько функций одну за другой, используя оператор pipe (|). В общем, рабочий процесс выглядит как на рисунке ниже.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform] | [Second Transform] | [Third Transform])
В нашем коде мы создадим две пользовательские функции. Функцию regex_clean, которая сканирует данные и извлекает соответствующую строку на основе списка PATTERNS, используя функцию re.search. Функция возвращает разделенную запятыми строку. Если вы не являетесь экспертом по регулярным выражениям, я рекомендую ознакомится с этим туториалом и попрактиковаться в блокноте, чтобы проверить код. После этого мы определяем пользовательскую ParDo-функцию под названием Split, которая является вариацией Beam-преобразования для параллельной обработки. В Python это делается особым способом — мы должны создать класс, который наследуется от класса DoFn Beam. Функция Split принимает распаршенную строку из предыдущей функции и возвращает список словарей с ключами, соответствующими именам столбцов в нашей таблице BigQuery. Есть кое-что, что следует отметить про эту функцию: мне пришлось импортировать datetime внутри функции, чтобы она работала. Я получал сообщение об ошибке при импорте в начале файла, что было странно. Этот список затем передается в функцию WriteToBigQuery, которая просто добавляет наши данные в таблицу. Код для Batch DataFlow Job и Streaming DataFlow Job приведен ниже. Единственное отличие между пакетным и потоковым кодом заключается в том, что в пакетной обработке мы читаем CSV из src_path, используя функцию ReadFromText из Beam.
Batch DataFlow Job (обработка пакетов)
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from google.cloud import bigquery import re import logging import sys PROJECT='user-logs-237110' schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING' src_path = "user_log_fileC.txt" def regex_clean(data): PATTERNS = [r'(^\S+\.[\S+\.]+\S+)\s',r'(?<=\[).+?(?=\])', r'\"(\S+)\s(\S+)\s*(\S*)\"',r'\s(\d+)\s',r"(?<=\[).\d+(?=\])", r'\"[A-Z][a-z]+', r'\"(http|https)://[a-z]+.[a-z]+.[a-z]+'] result = [] for match in PATTERNS: try: reg_match = re.search(match, data).group() if reg_match: result.append(reg_match) else: result.append(" ") except: print("There was an error with the regex search") result = [x.strip() for x in result] result = [x.replace('"', "") for x in result] res = ','.join(result) return res class Split(beam.DoFn): def process(self, element): from datetime import datetime element = element.split(",") d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S") date_string = d.strftime("%Y-%m-%d %H:%M:%S") return [{ 'remote_addr': element[0], 'timelocal': date_string, 'request_type': element[2], 'status': element[3], 'body_bytes_sent': element[4], 'http_referer': element[5], 'http_user_agent': element[6] }] def main(): p = beam.Pipeline(options=PipelineOptions()) (p | 'ReadData' >> beam.io.textio.ReadFromText(src_path) | "clean address" >> beam.Map(regex_clean) | 'ParseCSV' >> beam.ParDo(Split()) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) ) p.run() if __name__ == '__main__': logger = logging.getLogger().setLevel(logging.INFO) main()
Streaming DataFlow Job (обработка потока)
from apache_beam.options.pipeline_options import PipelineOptions from google.cloud import pubsub_v1 from google.cloud import bigquery import apache_beam as beam import logging import argparse import sys import re PROJECT="user-logs-237110" schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING' TOPIC = "projects/user-logs-237110/topics/userlogs" def regex_clean(data): PATTERNS = [r'(^\S+\.[\S+\.]+\S+)\s',r'(?<=\[).+?(?=\])', r'\"(\S+)\s(\S+)\s*(\S*)\"',r'\s(\d+)\s',r"(?<=\[).\d+(?=\])", r'\"[A-Z][a-z]+', r'\"(http|https)://[a-z]+.[a-z]+.[a-z]+'] result = [] for match in PATTERNS: try: reg_match = re.search(match, data).group() if reg_match: result.append(reg_match) else: result.append(" ") except: print("There was an error with the regex search") result = [x.strip() for x in result] result = [x.replace('"', "") for x in result] res = ','.join(result) return res class Split(beam.DoFn): def process(self, element): from datetime import datetime element = element.split(",") d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S") date_string = d.strftime("%Y-%m-%d %H:%M:%S") return [{ 'remote_addr': element[0], 'timelocal': date_string, 'request_type': element[2], 'body_bytes_sent': element[3], 'status': element[4], 'http_referer': element[5], 'http_user_agent': element[6] }] def main(argv=None): parser = argparse.ArgumentParser() parser.add_argument("--input_topic") parser.add_argument("--output") known_args = parser.parse_known_args(argv) p = beam.Pipeline(options=PipelineOptions()) (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes) | "Decode" >> beam.Map(lambda x: x.decode('utf-8')) | "Clean Data" >> beam.Map(regex_clean) | 'ParseCSV' >> beam.ParDo(Split()) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logger = logging.getLogger().setLevel(logging.INFO) main()
Запуск конвейера
Мы можем запустить конвейер несколькими различными способами. Если бы мы захотели, мы могли бы просто запустить его локально с терминала, удаленно войдя в GCP.
python -m main_pipeline_stream.py \ --input_topic "projects/user-logs-237110/topics/userlogs" \ --streaming
Однако мы собираемся запустить его с помощью DataFlow. Мы можем сделать это с помощью нижеприведенной команды, установив следующие обязательные параметры.
project— ID вашего проекта GCP.runner— средство запуска конвейера, которое проанализирует вашу программу и сконструирует ваш конвейер. Для выполнения в облаке вы должны указать DataflowRunner.staging_location— путь к облачному хранилищу Cloud Dataflow для индексировани пакетов кода, необходимых обработчикам, выполняющим работу.temp_location— путь к облачному хранилищу Cloud Dataflow для размещения временных файлов заданий, созданных во время работы конвейера.streaming
python main_pipeline_stream.py \ --runner DataFlow \ --project $PROJECT \ --temp_location $BUCKET/tmp \ --staging_location $BUCKET/staging --streaming
Пока эта команда выполняется, мы можем перейти на вкладку DataFlow в google-консоли и просмотреть наш конвейер. Кликнув по конвейеру, мы должны увидеть что-то похожее на рисунок 4. В целях отладки может быть очень полезно перейти в логи, а затем в Stackdriver для просмотра подробных логов. Это помогло мне разрешить проблемы с конвейером в ряде случаев.

Рисунок 4: Beam-конвейер
Доступ к нашим данным в BigQuery
Итак, у нас уже должен быть запущен конвейер с данными, поступающими в нашу таблицу. Чтобы проверить это, мы можем перейти к BigQuery и просмотреть данные. После использования команды ниже вы должны увидеть первые несколько строк набора данных. Теперь, когда у нас есть данные, хранящиеся в BigQuery, мы можем провести дальнейший анализ, а также поделиться данными с коллегами и начать отвечать на бизнес-вопросы.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Рисунок 5: BigQuery
Заключение
Надеемся, что этот пост послужит полезным примером создания потокового конвейера данных, а также поиска способов сделать данные более доступными. Хранение данных в таком формате дает нам много преимуществ. Теперь мы можем начать отвечать на важные вопросы, например, сколько людей используют наш продукт? Растет ли со временем база пользователей? С какими аспектами продукта люди взаимодействуют больше всего? И есть ли ошибки, там где их быть не должно? Это те вопросы, которые будут интересны для организации. На основе идей, вытекающих из ответов на эти вопросы, мы сможем усовершенствовать продукт и повысить заинтересованность пользователей.
Beam действительно полезен для такого типа упражнений, а также имеет ряд других интересных случаев использования. Например, вы можете анализировать данные по биржевым тикам в режиме реального времени и совершать сделки на основе анализа, возможно, у вас есть данные датчиков, поступающие с транспортных средств, и вы хотите вычислить расчет уровня трафика. Вы также можете, например, быть игровой компанией, собирающей данные о пользователях и использующей ее для создания информационных панелей для отслеживания ключевых показателей. Ладно, господа, это тема уже для другого поста, спасибо за чтение, а для тех, кто хочет увидеть полный код, ниже ссылка на мой GitHub.
https://github.com/DFoly/User_log_pipeline
На этом все. Читать первую часть.
ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/462589/
Добавить комментарий