Привет, Хабр! Меня зовут Алексей Жиряков, я техлид backend-команды витрины KION. Вместе с командой мы делаем один из самых технологических онлайн-кинотеатров. Этот текст написан по мотивам митапа Evrone: рассказываю, что такое продуктовые события и какими они бывают, зачем мы создали свой приемник событий и что нужно знать об уровнях Data Quality. Так что устраивайтесь поудобнее — и поехали!
Что было на старте
Сначала мы использовали решения сторонних известных вендоров, но в какой-то момент они перестали нас устраивать. Во-первых, из-за лимитирования: у сторонних систем есть лимиты на использование. С ростом количества юзеров у нас увеличивался поток продуктовых событий. Если нам нужно было перегрузить что-то историческое, то из-за лимитов при скачивании текущих событий мы упирались в очередь. Во-вторых, для некоторых платформ сторонние системы не могут обеспечить поток в реальном времени — например, для веба.
Сейчас у нас больше 400 миллионов событий в сутки, в пике они генерят больше 6 000 RPS. А еще — 8+ продуктовых вертикалей, это product-manager с командой разработки. Продакт отвечает за конкретную область и оцифровывает свои показатели с помощью разработчиков.
Продуктовые события
Что это такое
Простой пример: пользователь нажимает кнопку «смотреть» в нашем сервисе, и сразу после этого событие отправляется на бэкэнд, фиксируя, что конкретный зритель выбрал фильм для просмотра. Для product-manager это значимое событие. Его важно оцифровать, чтобы анализировать показатели, отслеживать взаимодействие пользователя с контентом и оценивать качество данных. Все это позволяет глубже понять предпочтения юзеров и улучшить сервис.
При этом продакт генерирует гипотезы, проводит A/B-эксперименты и смотрит, как они влияют на продуктовые показатели. А потом принимает решение, двигать фичу дальше или нет.
Как выглядят продуктовые события и какие они бывают
Это обычный JSON, где передаются основные данные — профиль и еще какие-то поля, которые необходимы для этого события:
События делятся на продуктовые, технические и логи — все они служат для улучшения продукта.
Продуктовые события помогают создавать, например, персонализированные витрины. Мы анализируем предпочтения пользователей и на основе полученных данных формируем уникальные рекомендации. Через A/B-тесты проверяем гипотезы, влияющие на ключевые показатели, и внедряем только те фичи, которые доказали свою эффективность.
Технические события связаны с метриками производительности: команды работают над повышением скорости загрузки, снижением потребления памяти и общей оптимизацией. Объем таких событий обычно выше, так как они направлены на улучшение работы системы.
Логи включаются с помощью Remote Config, когда нужно отслеживать сбои или более детально изучить конкретные ситуации.
Все это — продуктовые и технические данные — поступает на наш бэкэнд и направлено на развитие и улучшение продукта.
Почему качество данных — это важно
Каждая фича имеет свою цену. Даже за небольшой полкой в нашем онлайн-кинотеатре стоит серьезная работа бэкграунд-процессов: сбор статистики, анализ данных, построение моделей. Все это требует ресурсов, причем вполне ощутимых.
Именно так видит продукт product-менеджер, когда анализирует агрегированные данные событий и следит за ключевыми метриками.
Если данные окажутся некорректными, product-менеджер может принять неверные решения, которые могут привести продукт не туда. Например, если в показателях будет учитываться отрицательное время просмотра — из-за ошибки в данных, — это исказит общую метрику и может повлиять на результаты A/B-теста. Поэтому у Data Quality критически важное значение, ведь именно от его точности зависит адекватность аналитики и обоснованность последующих решений.
Свой приемник событий
Зачем мы вообще начали его делать
Конечно, можно использовать решения от крупных ИТ-компаний, и в этом есть свои плюсы: например, они бесплатные и уже проверенные временем. Но у проприетарных инструментов много ограничений. Выше я уже писал, что у них почти всегда есть лимиты на объемы данных и количество событий.
Собственный приемник дает нам реальное время обработки данных. Мы видим, что делает юзер прямо сейчас, и можем мгновенно реагировать на его действия. Например, если пользователь начал смотреть длинный фильм, мы можем сразу перестроить витрину, убрав этот тайтл и заменив его кнопкой «продолжить просмотр».
А еще собственное решение дает вариативность: если мы получаем логи, то можем гибко распределять их — хранить меньше, экономя ресурсы, или направлять по разным потокам обработки, ориентируясь на потребности. С вендорными решениями мы вынуждены выкачивать данные и зависим от задержек и ограничений стороннего бэкенда.
Все эти факторы и привели нас к решению создать собственный приемник событий, чтобы управлять процессами максимально гибко и эффективно.
Что с архитектурой
В общих чертах она выглядит так: SDK отправляет событие на бэкенд, который сам по себе не выполняет обработку, а только направляет все в Kafka. Там есть топик для «сырых» сообщений, куда эти данные и попадают. Он транслируется в Clickhouse для отладки, где «сырая» информация хранится временно для диагностики и анализа.
К топику «сырых» сообщений подключены consumers (подписчики), а это позволяет обеспечить горизонтальное масштабирование. Consumers обрабатывают данные: парсят, валидируют, обогащают их и записывают в отдельный топик уже «распаренных» сообщений. Дальше он направляется в Clickhouse, но здесь данные хранятся постоянно.
Доступ к Clickhouse обеспечивают GUI-инструменты: Redash, Superset или любые другие SQL-клиенты, через которые аналитики могут отслеживать данные на витринах и проводить необходимый анализ.
Python или Go?
Прежде чем создавать собственный приемник событий, мы решили провести сравнительный анализ, чтобы выбрать подходящий стек. Python-программисты часто присматриваются к Go, так что мы рассмотрели оба варианта. К тому времени уже был доступен Python 3.11 с заметным приростом производительности, так что выбирать нам стало еще интереснее.
Мы разработали микросервисы на Python и Go. Каждый из них принимал сообщения, извлекал JSON из тела запроса и отправлял данные в Kafka. На стороне Python мы использовали версию 3.11 и FastAPI, а в случае Go — нативные библиотеки для веб-сервисов и работы с JSON. Проведя сравнение, мы могли лучше оценить, какой из языков оптимален для нашей архитектуры.
Тестирование показало вот что: на Python при использовании одного ядра система выдерживала примерно 800 запросов в секунду (RPS), тогда как на Go — около 3 500 RPS на одно ядро. После этого начали расти тайминги.
Мы провели тесты на Locust и WRK, и результаты были схожими: около 850 RPS на Python и примерно 3 900 RPS на Go, не учитывая тайминги.
После анализа мы поняли, что у каждого решения есть свои плюсы и минусы. При тестировании Go показал интересные результаты — 6 000 RPS. Это выглядит впечатляюще, но прирост производительности вышел не таким значительным, как мы думали. Вместо десятикратных улучшений Go оказался примерно в четыре раза быстрее, что все же не колоссальное преимущество, учитывая задачи приема данных и записи в Kafka.
Мы наблюдали рост таймингов при нагрузке на одно ядро — в итоге производительность на Python оказалась примерно в четыре раза ниже, чем на Go. Но для решения нашей конкретной задачи это не критично: мы просто добавили несколько дополнительных под на Python, каждый из которых занимает всего 200 Мб. Хотя Go обеспечивает более высокую производительность, работа с Python оказалась проще.
Анализируя плюсы и минусы Python и Go, мы отметили, что у Python есть несколько существенных преимуществ:
-
отличная скорость разработки и простота в использовании: Python как высокоуровневый язык позволяет быстрее и легче воплощать идеи в код;
-
интеграция Swagger с FastAPI, которая в Python реализована на высоком уровне. В FastAPI Swagger практически безупречен, а в случае Go с GIN нужно дополнительно описывать и настраивать документацию, что добавляет сложности;
-
навыки команды были сосредоточены именно на Python, и это стало еще одним аргументом в пользу выбора.
Хотя мы с энтузиазмом искали возможность использовать Go, в этом проекте все же выбрали Python. Да, Go более ресурсосберегающий и быстрее в несколько раз, но разница не так велика, чтобы оправдать дополнительные сложности. Например, один под микросервиса на Python занимает около 200 Мб памяти, а если запустить 10 таких под, общий объем потребления составит 2 Гб. Для текущих ресурсов, особенно учитывая современные объемы памяти даже в недорогих смартфонах, это не проблема.
Взвесив все плюсы и минусы, мы остановились на Python: скорость и простота разработки оказались для нас решающими факторами.
Уровни Data Quality
Уровни качества в нашей архитектуре можно представить как многослойную структуру:
Физический уровень включает в себя проверку на каждом этапе обработки сообщения. Сначала чекается входная схема, потом выполняется валидация, включая бизнес-правила. Это нужно, чтобы убедиться в корректности данных и их соответствии нашим требованиям. После этого проверяется выходная схема, ведь данные должны быть записаны в Clickhouse, где используется строгая типизация. Нужно убедиться, что все корректно распарсилось и соответствует форматам.
Логический уровень фокусируется на мониторинге и контроле данных. Здесь мы настраиваем alert’ы для отслеживания любых отклонений и контроля поступления данных, чтобы своевременно реагировать на аномалии и поддерживать стабильный уровень Data Quality в системе.
Качество данных, физический уровень мы валидируем с помощью Python и Pydantic 2.
А логический реализовали на Airflow.
Что такое валидация входной схемы
Получив сообщение, мы записываем его в Kafka в том виде, в котором оно пришло, но перед этим выполняем проверку. На верхнем уровне контролируем, чтобы все необходимые поля соответствовали нужным типам данных. Это включает чек query-параметров, содержимого body и других обязательных полей. То есть на этом этапе мы гарантируем, что структура данных корректна и все элементы находятся на своих местах, прежде чем сообщение отправляется дальше для обработки.
Валидация событий — следующий этап, включающий бизнес-валидацию. Здесь мы чекаем каждое событие в зависимости от его контекста, чтобы убедиться в его смысловой корректности. Например, если приходит событие телесмотрения, мы проверяем, что время просмотра указано как положительное значение, есть имя события и данные профиля пользователя. Без этих элементов само событие теряет смысл, его невозможно анализировать.
Для каждого события мы проводим валидацию конкретных полей, необходимых для его корректной обработки. На уровне входной схемы перечисляем все обязательные поля и их типы, так что при любом несоответствии валидатор сразу указывает на проблему.
Валидация реализована с помощью библиотеки Pydantic, которая упрощает этот процесс. Она позволяет задавать строгие требования к структуре данных и автоматически генерировать ошибки в случае, если какого-либо поля нет или у него неверный тип.
Event-валидация
В нашей системе более 180 типов событий, и каждое из них содержит как общие, так и специфические поля. Первые включают, к примеру, профайл, имя события и deviceID. Вторые, контекстно-зависимые поля, зависят от типа события, стоимости подписки или количества перемотанного времени — это если пользователь перематывает контент.
Для валидации мы описываем все поля в отдельных Pydantic-классах. Потом, используя механизм наследования, собираем события в соответствующие классы. Когда они поступают, мы выбираем нужный класс по имени события, создаем instance и валидируем данные, проверяя, что они отвечают описанной схеме.
С выходом Pydantic 2 мы заметили значительное улучшение производительности. Это стало возможным благодаря тому, что библиотеку переписали на Rust. В тестах, доступных в сети, сообщалось о пятикратном увеличении скорости, и мы решили провести свои замеры. Протестировали тысячу событий, распарсили их и валидировали через Pydantic, используя нашу стандартную схему с большим количеством полей.
Результаты показали, что производительность действительно возросла — в нашем случае примерно в три раза. Если вы еще не перешли на Pydantic 2, стоит рассмотреть этот вариант, особенно если скорость и оптимизация — важные факторы для вашей системы.
Когда обрабатывается большой объем данных — например, 40 миллионов событий в сутки — для экономии CPU-ресурсов вовсе не обязательно проводить валидацию каждого из них. Можно настроить ее выборочно, выделяя поля, которые дают уникальность. Они формируются SDK, так что любая ошибка, которая есть в одном событии, скорее всего, обнаружится и в других аналогичных.
В нашем случае для проверки уникальности достаточно валидации по имени события, модели устройства и версии приложения. Применяя хеширование, например через cityhash, мы можем валидировать каждое сотое событие.
Даже если новая фича распространяется только на 10% пользователей, этого будет достаточно, чтобы уловить ошибку, если она встречается в этих 10%. Это снижает нагрузку на систему и сохраняет высокое качество контроля данных.
Мониторинг-алертинг
Для оптимального уведомления об ошибках валидации мы настроили систему так, чтобы Telegram сообщал о каждой уникальной проблеме только раз в сутки. Для этого мы используем комбинацию Redis и In-Memory cache.
Вот как это работает: если ошибка появляется, сначала мы проверяем, есть ли она в памяти. Для этого создаем хеш от сообщения. Если ничего не найдено, чекаем Redis. Если и там ничего нет, отправляем уведомление в Telegram и записываем ошибку в память и Redis, чтобы исключить повторные отправки в течение дня. Redis используется, потому что у нас несколько консьюмеров. Если бы мы полагались только на In-Memory cache, они могли бы отправить множество одинаковых сообщений.
Для Redis мы ставим TTL на каждый тип ошибки до полуночи или до начала следующего рабочего дня (например, до 7–10 утра). Это позволяет получать единичное уведомление о каждой ошибке раз в сутки, исключая избыточные оповещения.
Проверка при релизе
Раньше у нас возникали проблемы, когда при выпуске новой версии приложения изменялся тип данных в каком-то поле события или какое-то из них переставало отправляться вовсе. Эти версии проходили ревью в сторе, выпускались в продакшн, и только тогда мы обнаруживали проблему. Из-за этого нужно было откатывать релиз, а это время.
Чтобы минимизировать такие ситуации, мы внедрили проверку при релизе с использованием smoke-тестов. Разработали микросервис на FastAPI, которым пользуются тестировщики. Теперь они могут запускать тесты через эндпоинты, задавая параметры вроде устройства, его типа и версии. Благодаря собственному приемнику мы в реальном времени отслеживаем все, что тестировщики кликают в ходе тестов. Это позволяет нам сразу же анализировать уникальные события, их количество и типы ошибок и предоставлять результаты до того, как версия уйдет в продакшн.
Вот так легко и непринужденно мы контролируем качество данных и обеспечиваем высокую скорость приема.
Пока все. Если есть вопросы, задавайте в комментариях — постараюсь на все ответить.
ссылка на оригинал статьи https://habr.com/ru/articles/861502/
Добавить комментарий