Как с нуля построить систему обработки событий

от автора

Сегодня Александр Шувалов и Юлиян Латыпов поделятся с вами опытом создания системы обработки событий в потоке данных для обогащения информации и выявления аномалий.

Если вы ранее не были знакомы с приведенными ниже терминами, рекомендуем прочесть следующие статьи.

Для начала стоит добавить немного контекста, мы занимаемся продуктом категории  UEBA. Данные это логи с устройств, на которых работают сотрудники компании.

К нам поступила следующая задача — перед записью данных в целевую таблицу в потоке проводить обогащение событий дополнительной информацией и выявлять аномалии в данных.

Немного о потоке

Нам необходимо обрабатывать 80 тысяч сообщений в секунду, все это логи с разных устройств (Windows, Linux, Mac) и специализированного оборудования (сетевые маршрутизаторы).

Возможно, вы уже сталкивались с различными материалами по потоковой обработке, что не удивительно, так как тема популярна в эпоху Big Data. Однако, почти каждый такой материал, апеллирует своими критериями/требованиями к данным, скорости обработки и т.д. Из-за чего, сложно найти готовое описание решения под конкретную задачу, которую перед вами поставили. Именно с этим мы и столкнулись.

В нашем случае, ключевыми факторами потока было:

1) Обеспечить корректный таймлайн событий. Иначе часть сервисов обогащения будет давать неверный результат;

2) Обработка событий большого размера, что также уменьшает скорость consume/produce и нагружает канал передачи данных.

Перед нами стояли следующие задачи:

  1. Определение типа события и фильтрация:

  • 45 типов событий, определяются по содержанию сообщения;

  • отфильтрованные события попадают в обработку.

  1. Обогащение данных:

  • определение зрелости по пользователю и хосту. В нашем случае, зрелость — это состояние пользователя/хоста, когда для него удалось набрать определенное число событий для каждого типа события;

  • обогащение историческими данными;

  • определение геолокации по IP-адресу;

  • определение сетевой зоны по IP-адресу;

  • определение группы устройств.

  1. Определение аномалий:

  • первая активность пользователя;

  • бездействующий пользователь. Для нас пользователь считается бездействующим, если от него не поступало событий достаточно длительный промежуток времени.

Архитектура

Прежде чем перейти к рассказу об архитектуре, стоит объяснить, почему мы не использовали готовые решения, такие как Kafka Streams, Apache Flink или Spark Streaming:

  • Новый компонент в системе потребовал бы дополнительных усилий на его освоение, настройку и поддержку. Это дополнительные ресурсы и время, что для нас не всегда приемлемо;

  • Гибкость возможностей реализации из-за активной стадии разработки продукта.

Поэтому мы решили разрабатывать собственные решения, которые обеспечат нам гибкость в построении архитектуры и позволят контролировать всё, что происходит внутри сервисов.

На первой итерации мы разработали решения на монолитной архитектуре, так как такой подход обеспечил высокую скорость разработки и выдачи MVP версии, что нам и было нужно.

Процесс выглядел следующим образом:

Мы предполагали, что сможем легко масштабировать сервисы, добавив дополнительные инстансы, чтобы увеличить скорость обработки и справиться с любым потоком данных.

Однако возникли следующие сложности:

  • нагрузка на различные виды задач была неравномерной, и, добавляя отдельный инстанс, мы могли задействовать больше ресурсов, чем требовалось;

  • отказоустойчивость была на низком уровне: отслеживать работу всех процессов и перезапускать отдельные компоненты было сложно;

  • можно было легко построить границы необходимых для работы модулей, причем было известно, что они не будут меняться со временем.

Поэтому мы решили перестроить архитектуру обогащатора на микросервисы, пока не стало слишком поздно.

Проблемы и их решения

Определение типа события и фильтрация
Первоначальное решение было реализовано на Python, это работало довольно медленно из-за большого объема сообщений. Нам бы пришлось поднимать большое количество инстансов, что увеличивало нагрузку на Kafka и потребовало больше ресурсов.

Перенесли решение на ClickHouse вместо Kafka и выполняли задачу с помощью материализованных представлений, через простые правила. В результате мы смогли улучшить производительность системы без увеличения количества инстансов и избыточной нагрузки на инфраструктуру.

Если есть способ производить вычисления в базе данных, то нужно это использовать.

Timeline
Как отмечалось ранее, в обработку должны поступать события формирующие корректный временной ряд. Одним из ключевых аспектов этого процесса является сортировка входящих данных.

Изначально для сортировки данных, поступающих к нам за различный период времени, мы решили использовать очередь с приоритетом. Мы разработали прототип и протестировали его на наших данных. Хотя решение работало, оно имело свои недостатки. Главным недостатком было большое использование оперативной памяти, которое росло с увеличением времени ожидания сообщений. Соответственно, чем дольше ожидание, тем больше требуется памяти и медленнее работает сервис. 

В итоге, мы остановились на достаточно простой в реализации идее. Поскольку данные сразу поступают в ClickHouse, мы можем запрашивать их в уже отсортированном виде “из прошлого», то есть те данные, принятые, например,2 минуты назад, и отправлять их в Kafka.

Вывод такой же

Размер сообщений
Большие размеры сообщений стали для нас серьезной проблемой, поскольку они препятствовали достижению необходимой скорости обработки потоков данных. Высокая нагрузка на сеть приводила к значительным задержкам.

Первоначально мы попытались решить проблему с помощью стандартных средств сжатия данных в Kafka, но это не дало ожидаемых результатов. 

После обратились к подходу Event Driven Architecture (EDA) для взаимодействия между сервисами. Event-ориентированный подход в чистом виде нам не подходил из-за высокой нагрузки на IO-операции с базой данных. Мы пришли к использованию дифференциальной передачи информации (diff). Этот метод позволяет передавать только полезную нагрузку события, нужную другим сервисам. 

Мы стали обрабатывать только ту часть данных события, которая необходима для наших аналитических задач. Это позволило сократить объём передаваемого сообщения до 200-300 байт, в сравнении с 6 килобайт в среднем на одно сообщение ранее это существенная разница. Хотя данное решение значительно улучшило ситуацию с трафиком, оно также породило новую проблему объединения данных, о которой мы расскажем далее.

Нужно найти баланс между производительностью сервиса и задержками на обработку сообщений, оценив максимальный полезный объем передаваемых данных и то, что можно оставить для получения сервисом из других источников, таких как БД.

Масштабирование и отказоустойчивость
Наши системы делятся на stateless и stateful  сервисы. Stateless сервисы работают легко и их также легко масштабировать, поскольку они не хранят состояние между запросами. Это делает их идеальными для распределенной обработки данных. Stateful сервисы выполняют куда более сложную задачу. Сервисы этого типа хранят состояние между запросами, что требует сохранения дампа состояний в долговременную память и эффективную синхронизацию между этими инстансами.

Проблема с синхронизацией состояния решается направлением потока в определенные инстансы по ключевым полям события — роутер. Этот компонент может разграничить потоки обработки данных на изолированные группы. Такой подход обеспечит более эффективное управление ресурсами и снизит риски возникновения проблем с синхронизацией состояния.

Для обеспечения отказоустойчивости можно поднимать два инстанса, первый из которых выполняет полезную задачу, а второй опрашивает первый о его состоянии, в случае сбоя первого, второй сервис создает процесс и занимает место закончившего работу сервиса.

Проектируйте архитектуру сервиса с учетом возможности масштабирования и обеспечения отказоустойчивости.

Объединение diff

Итоговая вставка: у нас есть изначальное большое событие и diff — наша разница, обогащающая оригинал. Мы знали, что ClickHouse хорошо работает с MatView и плохо с JOIN. Многие решения работали медленно и забивали оперативную память. 

Слева обычный JOIN, при больших объемах таблицы B, забьет оперативную память. Эту проблему удалось решить подзапросом, который уменьшает выборку данных из таблицы B для JOIN.

У нас это работает так. Создаём таблицу с движком Kafka в ClickHouse. После чего отправляем весь diff в очередь Kafka и настраиваем потребителя в ClickHouse, чтобы обрабатывать эти данные. Потребитель получает данные из очереди, определяет объемы для обработки и добавляет их в материализованное представление, которое выступает в роли триггера. В этой view происходит JOIN с использованием подзапросов по ID события, что уменьшает выборку большой таблицы, такие JOIN`ы работают быстро. Этот подход позволяет избежать прежних проблем с производительностью и перегрузкой памяти, предоставляя ClickHouse возможность оптимально распределять нагрузку и эффективнее использовать ресурсы.

“Также информация для справки: обнаружили, что Clickhouse при запросе резервирует на поле по 5 мб памяти, что при частых запросах на 100+ полей забивает ОЗУ.”

Изучайте документацию, порой там скрыты решения ваших проблем, что значительно ускорит разработку

Архитектура

Видно, что все сервисы выстроены последовательно: каждый последующий должен отработать после предыдущего. Возможно, это не самый оптимальный подход, ведь некоторые сервисы можно запускать параллельно. Однако такой вариант позволил нам избежать необходимости в конечном сервисе синхронизации, который сопоставлял бы данные всех сервисов. Это создало бы дополнительную нагрузку и увеличило вероятность ошибок.

Преимущества текущего подхода

Поскольку предложенный вариант позволяет обеспечить и поддерживать необходимый нам поток данных, мы остановились на нём. При этом мы предусмотрели возможность масштабирования каждого участка схемы. Если поток данных значительно возрастет, мы сможем перевести процесс в параллельную обработку, добавив сервис синхронизации.

Таким образом, последовательное расположение сервисов позволяет нам избежать лишней сложности и ошибок, обеспечивая стабильную и эффективную обработку данных. Это решение гибко и может легко адаптироваться к изменениям в объеме и скорости поступающих данных.


Подписывайтесь на телеграмм канал Crosstech Solutions Group, чтобы всегда быть первым в курсе событий!


ссылка на оригинал статьи https://habr.com/ru/articles/842186/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *