Весной этого года я узнал о возможности базы данных HP Vertica создавать запросы с матчингом паттернов событий. Так называемый Events Pattern Matching хорошо ложился под задачу анализировать поведение пользователей в продуктах ivi.ru. Мы решили попробовать разобраться с воронками оплаты, с поиском проблемных мест на устройствах, глубже погрузиться в анализ трафика. Нашей команде очень нравится, как реализована аналитика у Mixpanel и Localytics (она как раз основана на событиях и их свойствах), поэтому многие идеи были позаимствованы у них.
Что вообще происходит?
Исторически для аналитики мы, как и большинство остальных проектов, использовали Google Analytics. В какой-то момент на наших объемах сэмплирование данных достигло немыслимых масштабов — выборки строились менее чем на 0,5% аудитории. Это делало невозможным работу с небольшими выборками — они либо вообще не были видны, либо погрешность была катастрофичной. Плюс в GA невозможно было прокинуть кучу внутренних данных о контенте, что делало невозможным глубокий анализ.
Этот факт послужил поводом для того, чтобы заняться разработкой собственной системы. Так родился Groot — внутренняя аналитика ivi.ru.
Мы начали со списка требований, которым должен был соответствовать Groot:
- Отсутствие сэмплирования, все данные должны храниться в сыром виде;
- Кроссплатфоременность. Поскольку у нас помимо сайта есть очень популярные приложения для мобильных платформ и Smart TV, система должна уметь собирать данные даже с утюга, если он подключен к интернету и на нем стоит наше приложение;
- Возможность быстрого масштабирования;
- Отсутствие SPOF;
- Простота настройки и разворачивания.
Архитектура
Помимо колоночной базы HP Vertica, решили использовать Apache Kafka и Apache Storm, тем самым открыв для себя великий и ужасный мир Java.
Apache Kafka — pub/sub система. Основным отличием от обычных реализаций pub/sub является то, что подписчик может начать чтение сообщений не с конца, а с начала или середины. Это решение позволяет не беспокоиться о потере данных, когда подписчик не работает.
Apache Storm — распределенная система для вычислений большого объема данных. Вообще, на тему Storm можно говорить долго. Нам в нём понравилась интеграция с kafka из коробки, возможность горизонтально масштабировать систему и достаточно быстрая скорость работы.
Взгляд сверху
В целом система работает следующим образом:
- Клиент отправляет запрос с JSON-информацией о событии;
- web-сервер на flask асинхронно отправляет пачку событий в kafka;
- storm постоянно забирает новые сообщения из kafka;
- в storm топология парсит, разбирает событие и строит batch запрос в vertica и сохраняет в базу данные.
Первые неловкие шаги
Первая версия работала очень плохо. Точнее, проблем отправкой данных в kafka не было совсем (все работает из коробки). А с apache storm пришлось повозиться, так как нам надо было написать свою топологию на java, которую у нас в компании никто не знает.
Топология в storm состоит из следующих частей:
- spout — краник из которого постоянно (или нет) прилетают данные. В нашем случае это стандартный KafkaSpout;
- bolt — собственно обработчик данных. В «болтах» происходит вся магия работы с данными;
- tuple — стандартная структура данных. В tuple может хранить что угодно, от простого числа до объекта.
Я реализовал простейший bolt, который получал событие, парсил json и отправлял в базу пачку. Первые тесты выявили следующие проблемы:
- Vertica блокирует таблицу во время записи;
- Очень сложно отследить проблемные места в топологии;
- Thread с вставкой в базу мог отправлять то 1 запись, то сразу 100. Не было понимания почему так происходит;
Первая версия была очень простой: есть колонки с id, name, subsite_id, user_id, ivi_id, ts. При этом возникли трудности с таблицами в Vertica тоже оказалось сложно.
Как видите, больше никаких данных мы не записывали. Потом, правда, решили записывать еще браузер, операционную систему, размеры окна браузера, версию флэш плеера. «Ха!», — подумали мы и сделали такую таблицу:
| id | event_id | name | int_value | string_value | double_value | datetime_value | added |
Сделали второй bolt, который из JSON достает дополнительные параметры, проверяет тип и записывает все это в новую табличку.
Все было прекрасно, я радовался, что так круто получилось реализовать, аналитики радовались, что можно добавлять любые параметры в события и затем по ним строить отчеты. В то время у нас главным источником событий был сам сайт ivi.ru, мобильные приложения еще ничего не отправляли. Когда же они начали отправлять, мы поняли, что все очень плохо.
Сначала давайте посмотрим на наш запрос для простой воронки «нажал» -> «купил» для браузера Chrome:
WITH groupped_events AS ( SELECT MIN(e.ts) as added, MIN(e.user_id) as user_id, e.name, MIN(CASE WHEN ep.name = 'browser' THEN string_value ELSE NULL END) as browser from events.events as e LEFT JOIN events.event_properties as ep ON ep.event_id = e.id WHERE e.added >= '2014-07-28' and e.added < '2014-07-29' and e.subsite_id = '10' GROUP BY e.id, e.name ) SELECT COUNT(q.match_id) as count, name FROM ( SELECT event_name() as name, user_id, match_id() as match_id FROM groupped_events as e WHERE e.name IN ('click', 'buy') MATCH ( PARTITION BY user_id ORDER BY e.added ASC DEFINE click as e.name = 'click' and e.browser = 'Chrome', buy as e.name = 'buy' PATTERN P as (click buy | click) ) ) as q GROUP BY q.match_id, q.name;
Видите подвох? Мы джойним табличку (сейчас там больше миллиарда записей), группируем ее и вытаскием через CASE нужное значение. Конечно же, когда у нас стало много событий, все это стало тормозить. Запросы работали по несколько минут, что нас не устраивало. Аналитики жаловались на запросы в полчаса, продуктологи хотели устроить мне темную.
Почему?
Отдельно хочется пояснить факт, что все-таки HP Vertica это колоночная база данных. Она очень компактно хранит кучу данных в колонках и позволяет, например, добавлять новую колонку налету, без перелопачивания всех данных. С нашей же табличкой «все-в-одном» вертика справлялась очень плохо — она не понимала как оптимизировать эту кучу.
Тогда было принято решение перетащить основные параметры в таблицу events отдельными колонками, и сформировать список параметров, которые часто используются в запросах. Такую процедуру мы проделали 2 раза. В первый раз у нас появилась таблица с 30 колонками, во второй раз, уже с 50. После всех этих манипуляций, среднее время выполнения всех запросов уменьшилось в 6-8 раз.
После всех манипуляций, предыдущий запрос превратился в простой:
SELECT COUNT(q.match_id) as count, name FROM ( SELECT event_name() as name, user_id, match_id() as match_id FROM events.events as e WHERE e.name IN ('click', 'buy') MATCH ( PARTITION BY user_id ORDER BY e.added ASC DEFINE click as e.name = 'click' and e.browser = 'Chrome', buy as e.name = 'buy' PATTERN P as (click buy | click) ) ) as q GROUP BY q.match_id, q.name;
На этом мучения с базой у нас прекратились, в таком виде все живет уже около 3х месяцев и притензий у нас к ней не было.
Мы все равно оставили таблицу event_properties, чтобы можно было разрабатывать приложения быстрее, а не ждать обновления структуры основной таблицы.
Apache Storm
Разобравшись с HP Vertica, мы стали разбираться с Apache Storm: нужно было стабилизировать работу, убрать отдельный Thread и быть готовым к большим нагрузкам.
Есть минимум два способа batch-процессинга в storm:
- Отдельный thread с заполняемым списком;
- Использование стандартной возможности принимать tickTuple;
Сначала мы испробовали первый вариант и отбросили его — поведение нестабильным, запросы шли в почти в холостую. Второй вариант показал нам всю прелесть Storm:
С помощью простой настройки при создании топологии мы можем указать, когда хотим получить tickTuple (у нас 10 секунд). TickTuple это пустая запись, которая отправляется в основной поток раз в 10 секунд. Можем спокойно отследить такую запись, добавить в очередь или запись все в базу.
private static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); } @Override public void execute(Tuple tuple) { if( isTickTuple(tuple) ) { executeTickTuple(tuple); } else { executeTuple(tuple); } }
В executeTuple
мы сохраняем событие в очередь LinkedBlockingQueue
, и, соответственно, в executeTickTuple
мы проходим по очереди и вставляем пачкой в базу.
Нашу топологию мы разделили на несколько Bolt
:
- KafkaRecieverBolt — получает данные из KafkaSpout, парсит JSON и отправляет в PropertiesParserBolt;
- PropertiesParserBolt — парсит нестандратные параметры, отправляет их EventPropertiesBatchBolt, отправляет все событие дальше в EventsBatchBolt
- EventsBatchBolt — сохраняет данные в основную таблицу;
- EventPropertiesBatchBolt — сохраняет данные в таблицу доппараметров
Теперь мы можем посмотреть какой из «болтов» тормозит и сколько данных через него гоняется: Статистика работы топологии из Storm UI
Послесловие
В следующей статье я постараюсь рассказать как это все администрировать и мониторить.
ссылка на оригинал статьи http://habrahabr.ru/company/ivi/blog/236253/
Добавить комментарий