Как мы готовили распределенный джойн на Spark Structured Streaming. Доклад с RamblerMeetup&Usermodel

от автора

История о том, как суточный ETL-контур карабкался в реалтайм.

Rambler&Co – это медиахолдинг, который состоит из популярных и ежедневно читаемых изданий Рунета. Самые крупные из них давно у всех на слуху: «Лента.ру», портал «Рамблер», «Газета.Ru», «Секрет фирмы» и другие. Все они стремятся к тому, чтобы пользователь читал наиболее релевантный для него контент и проводил на площадке больше времени.

Специально для этого в рамках AdTech-подразделения холдинга выделено отдельное направление Usermodel, которое занимается анализом и сегментацией аудитории, а также повышением конверсий на площадках. Один из проектов этого направления – Recommender (система рекомендаций), в котором искусственный интеллект подстраивается под интересы пользователей и из массы контента на площадке выбирает самые интересные новости и статьи персонально для каждого посетителя сайта.

Что вы узнаете из этой статьи?

  • Зачем нам нужен Realtime-контур на Spark?

  • Как правильно написать его так, чтобы он джойнил логи на лету?

  • С какими подводными камнями можно столкнуться?

И да, кстати, недавно мы провели RamblerMeetup&Usermodel, на котором я выступил как раз с этим материалом. Посмотреть запись митапа можно тут:

С чем мы имеем дело?

Для начала проекту рекомендаций нужно узнать, как пользователи взаимодействуют с контентом, чтобы понять, что их интересует. Для этого медиаплощадки отправляют множество событий в счётчик Tоп-100 – собственную систему веб-аналитики, разработанную Rambler&Co, и каталог, ведущий подсчет популярности сайтов в Рунете. Инструменты Топ-100 позволяют смотреть, как посетители взаимодействуют с контентом, размещенным на той или иной площадке, измерять кликабельность отдельных элементов страницы, а также проводить конкурентный анализ сайтов.

Топ-100, в свою очередь, засылает события, которые ему передают медиаплощадки, в распределённую очередь Kafka, оттуда они вычитываются в HDFS и над ними строятся таблицы в Hive. Это классическая схема ETL.

Сервис рекомендаций и сам отсылает события, но уже в другую Kafka. В них примерно такое содержание: «Рекомендованы статьи <item_1>, <item_2> для пользователя <user_id>». Они проходят практически тот же путь от сервиса до хадупа, что и события от площадок. Потом эти события с фронта и бэка джойнятся для последующей аналитики. Так выглядела схема описанного контура:

Чего нам не хватало?

Один из недостатков классического ETL – суточные выгрузки. Данные будут готовы к анализу только на следующий день после того, как событие произошло. Эта проблема встала особенно остро, когда потребовалось проводить аналитику в реальном времени.

Хотелось реалтайм-аналитики – взяли Spark Structured Streaming и решили переписать контур на него. Новая схема выглядит вот так:

Важное замечание: джойн теперь лежит на плечах Spark Structured Streaming и проходит в реальном времени. А вместо таблиц хранения данных в хадупе мы воспользовались аналитической базой данных Clickhouse. Основной вопрос в этой ситуации – как провести Join потоков событий из двух Kafka-очередей в реальном времени.

Spark Structured Streaming

Что вообще это такое? Прежде всего это часть фреймворка Spark, которая отвечает за обработку потоков данных. Внешне операции над потоками очень похожи на операции в Spark SQL – те же датафреймы, те же трансформации. В потоковом пайплайне важно понимать разделение операций на stateless и stateful.

Stateful-операция в Spark – операция с сохранением состояния, в которой обработка новой записи зависит от пришедших ранее данных. Пример – Join стримов, когда в наш обработчик приходит событие из потока Топ-100 (допустим, событие клика), результат его обработки зависит от того, пришло ли до этого соответствующее событие от сервиса и каким оно было. Разберём подробнее, как такой Join происходит и как его настроить.

Realtime Join

Как выяснилось, Join двух потоков отличается от Join операции над статическими данными. Ниже несколько нюансов join-a в реальном времени, которые надо как-то разрешить.

Не все данные известны, они приходят в реальном времени.

Если при работе со Spark SQL или с базами данных можно сделать запрос за любой интересующий промежуток времени, то в задачах потоковой обработки этой возможности не хватает. Все данные, которые когда-либо приходили, не получится хранить в стейте – его нужно очищать. Но как понять, какие данные не пригодятся и когда их можно удалить? Самая удобная идея – хранить данные за последние N минут, а всё, что старше, – удалять. Звучит довольно просто. Но разберём несколько гипотетических ситуаций.

Предположим, что мы храним данные за последние 30 минут. Пользователь заходит на сайт. Сайт запрашивает у нашего сервиса персональные рекомендации. В логах сервиса появилось событие ответа, в логах клиента – событие показа. Предположим, что пользователь уйдёт на 2 часа, а потом вернётся и кликнет на рекомендованную сервисом новость. Событие попадёт в счётчик и уйдёт в Kafka, но мы потеряем его связь с ответом сервиса, потому что событие из сервиса давно удалилось из стейта.

Готовы ли мы с этим смириться? Клик через 2 часа после просмотра – ситуация нечастая, а чтобы её правильно обработать требуется слишком много ресурсов. Допустим, на это мы готовы закрыть глаза и потерять реалтайм-статистику по таким событиям. Но что произойдёт, если пользователь кликнул на статью сразу после выдачи сервиса, событие отправилось, но из-за проблем с очередью в потоковую обработку оно пришло только спустя 2 часа? Будем ли мы при каждом падении брокера терять статистику по событиям? С такими рисками уже гораздо сложнее смириться.

Получится ли обработать событие, которое произошло через минуту после запроса к сервису, но из-за проблем с очередью могло прийти гораздо позже?

Чтобы разрешить вопросы вроде этих, на помощь приходят инструменты Spark Structured Streaming для stateful-операций. Рассмотрим здесь три из них:

  1. Event time;

  2. Watermark;

  3. окно в join condition.

Event Time

Event time обозначает время, в которое событие произошло. Эта информация просто необходима при потоковой обработке в Spark со stateful-операциями. Если событие опоздает, Spark будет оперировать не временем его чтения, а временем, когда оно возникло. Почему это так важно? Посмотрим на схему:

Здесь изображён простой поток данных, в котором одно из событий опоздало. Часто они читаются из Kafka не в том порядке, в котором произошли. Событие «owl» произошло в 12:05, раньше чем «owl cat», но пришло оно позже (примерно в 12:17).

Именно Event time позволит восстановить последовательность событий и моменты времени, к которым они относятся.

Watermark

Разобравшись в Event time, можем двигаться к инструменту Watermark.

Watermark – это то время, после которого все события потока должны храниться в стейте и которое обновляется в конце обработки каждого микробатча. Watermark позволяет ограничить количество хранимых событий.

Вычисляется Watermark так:

  • берём максимальное (минимальное или среднее, есть вариации) время события в стейте (Event time, естественно);

  • вычитаем watermarkDelay – некоторое значение типа timedelta, которое мы задали в коде.

Теперь стейт не будет разрастаться, потому что старые события будут удаляться автоматически.

Так выглядит Watermark наглядно. Жёлтая линия на графике – максимальный Event time, оранжевая – Watermark. В стейте хранятся только данные за период между оранжевой и жёлтой линией, на графике это период равен примерно 20 минутам. Для нашего приложения объём стейта за это время достигает 80 ГБ.

Join Condition

Переходим к менее очевидному инструменту. Видно, что стейт в нашем приложении достаточно большой. А когда придёт новое событие из потока Топ-100, Spark будет искать в нём соответствующее событие из логов сервиса, и, так как он большой, это займет много времени. Чтобы ускорить процесс, мы хотим установить правило: все интересующие нас события на клиенте происходят в течение N минут после ответа сервиса. Чтобы поиск шёл не по всему стейту, а только по определённому временному окну вокруг события, достаточно указать ограничения на Event time обоих потоков относительно друг друга в условии джойна.

Так будет выглядеть условие для простого джойна:

s1 = input_stream1.alias("s1") s2 = input_stream2.alias("s2")  s1.join( s2, on=""" s1.word=s2.word and s1.event_time <= s2.event_time + interval '5 seconds' and s1.event_time >= s2.event_time - interval '30 seconds' """)

Теперь поиск событий из intput_stream2 для джойна независимо от размера стейта будет происходить в таком окне:

В такой ситуации джойн не произойдёт:

Нюансы, с которыми мы столкнулись

Мы использовали описанные инструменты, поставили в качестве Event time timestamp продюсера Kafka, подобрали Watermark для нашего стейта и ждали, что джойн будет проходить гладко. Но что-то пошло не так.

Первая проблема – потеря данных. ~50% данных, которые мы ожидали увидеть в Clickhouse, куда-то пропадали. В теории это было невозможно: мы использовали левый джойн, все данные левого потока должны были попасть в базу. Но на всякий случай мы замониторили Watermark и min-, avg-, max- Event time стримов. Создали тот самый график, который представлен выше как иллюстрация Watermark. И сначала он выглядел совсем по-другому:

Мы стали разбираться, откуда такие колебания в Event time, и выяснилось, что Kafka timestamp очень ненадёжный: в половине случаев приходил нулевой Event time, несмотря на то, что продюсер явно проставлял это время при отправке события. Данные мы в итоге нашли в партиции 1970-01-01 Clickhouse, потому что они партиционировались по тому же Event time.

Столкнувшись с такой ситуацией, мы сделали следующий вывод:

не надо полагаться на временные метки от брокеров сообщений / баз данных / других посредников (особенно если речь про Kafka). Значение Event time должно находиться внутри события, быть частью данных и формироваться перед отправкой.

Когда пофиксили Event time возникла вторая проблема: слишком долгая обработка. Вначале всё было в порядке – микробатч обрабатывался в пределах трёх минут, триггер на обработку происходил раз в 5 минут. Но через несколько дней время обработки начинало расти и дорастало до нескольких часов. Причина – скос партиций. Spark распределил данные для обработки неравномерно, в результате на нескольких экзекьюторах оказалась слишком большая часть данных, которые гораздо дольше обрабатывались на других экзекьюторах.

Ключи в джойне имели тип uuid, они не должны были вызывать скос, потому что генерировались случайным образом для каждого запроса к сервису. Оказалось, что среди них было очень много NULL-значений. Они попадали на один экзекьютор, и Spark пытался по ним сджойнить потоки, хотя это не имело смысла. Классический вариант решения проблемы скоса – добавить в NULL-ключи соль для более равномерного распределения – оказался не лучшим вариантом. Мы нашли более эффективный способ: выделили все события с NULL-ключами в отдельный поток и стали писать его в базу без джойна.

Схема Spark-приложения получилась такой:

Мы не вешали Watermark на события с NULL-ключами, не сохраняли их в стейт, разделили потоковую обработку на две и, таким образом, ускорили процесс.

Следующий этап – отправка в Clickhouse. Эта база данных плохо обрабатывает операции на вставку с большим количеством данных (более миллиона строк), поэтому надо их как-то разделить. Средствами Pyspark нельзя эффективно разделить данные на партиции по количеству строк (так, чтобы в каждой было не более N строк, при том, что общее число строк неизвестно), поэтому мы решили внутри каждой партиции при отправке делить данные на чанки и отправлять за один запрос не более 500 000 строк. Даже если обычно партиция насчитывает меньше полумиллиона строк, это решение будет хорошей подстраховкой на случай, если приложение выйдет из строя на некоторое время и потом будет восстанавливаться с чекпоинтов и обрабатывать одним батчем гораздо больше данных, чем в пределах одного интервала. В качестве альтернативы мы рассматривали параметр конфига `maxOffsetsPerTrigger: 500000`, но вышло так, что, на первый батч после старта стриминга это ограничение не влияет, а после старта с чекпоинтов первый батч – самый большой. И вишенкой на торте был мониторинг расхождений с существующим ETL-контуром. Сейчас это расхождение составляет менее одного процента.

Итак, подведём итоги. В процессе переезда на реалтайм-обработку наших событий мы приобрели значительный опыт, а также выявили критические ошибки, с которыми бы не пожелали бы никому сталкиваться. Это и неправильный подбор Event time, и недостаточное внимание к распределению ключей в джойне, и некорректный подбор окон. Но мы вынесли из этого процесса некоторые инсайты, которые не получится найти на страницах документации или увидеть на примере реального проекта. Сейчас наш стриминг прекрасно работает, матчится более чем на 99% с статической суточной ETL-обработкой, которая осталась рядом.

Из этого доклада вы узнали:

  • почему ETL-контура может быть недостаточно;

  • как сджойнить большие данные в реальном времени и без сохранения в HDFS;

  • как не перегрузить Clickhouse своим реалтайм-контуром.


ссылка на оригинал статьи https://habr.com/ru/articles/569932/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *