Пайплайны записи своими руками: думали — велосипед, оказалось — паттерны

от автора

Привет, Хабр! Я Роман Щербаков, ведущий инженер в Sage — это платформа мониторинга в Т-Банке, которую мы разрабатываем с 2019 года. За пять лет нагрузка на платформу возросла многократно, и, чтобы ее выдерживать, мы постоянно докручиваем наше решение. 

В этой статье расшифровка моего доклада с Saint HighLoad++ 2024 о том, как мы строим нагруженные пайплайны записи. И о том, как было бы здорово заранее знать, что нам потребуется. Мы придумали много всего для надежной работы пайплайнов, а посмотрели ретроспективно, и оказалось, что это просто одни сплошные стандартные паттерны.

Главная характеристика продукта — надежность

Недавно читал SRE-book и встретил отличную цитату Бена Трейнора Слосса. В моем вольном переводе она звучит примерно так: «Главная характеристика продукта — это надежность, потому что система бесполезна, если ею нельзя пользоваться». 

Мне кажется, это нужно добавлять пререквизитом к архитектуре каждой системы. И после этого только собирать нефункциональные и функциональные требования. 

Давайте разберемся, что такое надежность. Я нашел кучу разных определений, предлагаю настроиться на одну волну и понимать под надежностью следующее.

Надежность — это то, насколько пользователи готовы доверять нашему продукту свои задачи. Виды надежности — доступность (availability) и устойчивость (resilience).

Надежность позволяет системе быть готовой к эксплуатации в нужный момент и соответствовать заявленным характеристикам во время этой эксплуатации. 

Стоит обратить внимание еще на отказоустойчивость — способность системы сохранять свою надежность во время эксплуатации и продолжать работать, если какие-то из компонентов отказывают.

Sage — это observability-платформа в Т-Банке. И для таких систем основой выступают данные телеметрии. Мы процессим джентльменский набор: логи, метрики и трейсы. 

Поток телеметрии в секунду

Поток телеметрии в секунду

Кстати, для пайплайна записи принципиальное значение имеет размер потока, а не то, сколько данные занимают на дисках. 

Например, логов мы процессим примерно 3 ГБ в секунду — примерно 3 млн ивентов. Метрики — 450 МБ пожатых данных. Если разжать,  будет раз в десять больше: 12 идн точек и трейсов — 500 МБ или 2 млн спанов. 

Данных довольно много. Как мы справляемся с такой нагрузкой? Чтобы разобраться, посмотрим, как выглядит текущая архитектура Sage.

Архитектура Sage

Начнем с логов. Приложения либо сами пушат в нас данные через OTEL-коллекторы, либо какие-нибудь файлбиты собирают логи с хостов и заливают нам в Kafka. Дальше специальный Logs Worker складывает эти данные в Elasticsearch для полнотекстового поиска. 

Архитектура пайплайна записи логов

Архитектура пайплайна записи логов

В метриках либо мы собираем данные с приложений, либо в нас пушат по протоколу Remote Write VM-агенты и Прометеусы. После этого все попадает в Kafka. Дальше специальный Metrics Worker процессит данные и складывает в VictoriaMetrics.

Архитектура пайплайна записи метрик

Архитектура пайплайна записи метрик

Трейсы приложения пушат в OTEL-коллектор. Все это тоже складывается в Kafka. Потом уже Traces Worker все раскладывает по куче разных ClickHouses.

Архитектура пайплайна записи трейсов

Архитектура пайплайна записи трейсов

Как видите, друзья, у этих пайплайнов много общего. Такой стиль архитектуры называется N-Tier — очень удобная штука, которая позволяет разделять логические слои в приложении и физические тиры. Деплоймент отделяются от логики, поэтому можно гибко масштабировать систему, повышать отказоустойчивость всего пайплайна в целом.

N-Tier-архитектура

N-Tier-архитектура

Проблемы и паттерны для их решения

Во время эксплуатации мы встретились с огромным количеством проблем и для каждой нашли решение. 

Масштабирование системы. У нас много дата-центров и клиентов. Под клиентами я подразумеваю команды, которые пишут свои приложения. Соответственно, телеметрия из этих приложений — то, что для нас является данными клиентов. 

У клиентов может быть разный SLA, а у наших компонентов может быть разная производительность, пропускная способность и так далее. 

Как же все это объединить и порешать? В первую очередь нужно ввести понятие Data Gravity. 

Data Gravity придумали где-то 10—15 лет назад. История о том, что большие данные похожи на большую массу, притягивают к себе другие данные, приложения и сервисы. 

В контексте дата-центров это означает, что там, где данные создались, там они и должны оставаться. Их не надо двигать между дата-центрами — это дает хорошую экономию.

В контексте Sage это означает, что приложение, которое запущено в конкретном дата-центре, должно пушить телеметрию внутри этого дата-центра. После мы должны хранить данные внутри того же самого дата-центра, после чего данные будут охлаждаться внутри дата-центра и в конце концов удаляться просто по ретеншену.

Если мы никуда ничего не двигаем, это радикально повышает надежность системы и доступность данных. Также эта штука позволяет нам применять Deployment Stamps Pattern внутри дата-центров. 

Deployment Stamps Pattern — в каждом дата-центре свой пайплайн

Deployment Stamps Pattern — в каждом дата-центре свой пайплайн

У нас нет какого-то такого большого Sage, который один на всех клиентов. Пайплайны записи локализованы внутри дата-центров, то есть каждый дата-центр — отдельный пайплайн, и масштабируем мы Sage именно такими пайплайнами. 

Запускаем новый дата-центр — поднимаем отдельный пайплайн. Нужно каких-то отделить клиентов друг от друга, какие-то особые требования для них выполнить — мы строим внутри дата-центра тоже отдельный пайплайн. 

Наш подход отлично работает, когда нужно посчитать capacity. Человек приходит и говорит: «Нам нужно отливать в вас 100 МБ логов». Мы посмотрели: ага, capacity есть, заезжаем. Если нет, мы говорим: «Пора строить новый пайплайн». Тогда уже ведутся совершенно другие работы. 

Большая нагрузка. В нашем случае это неприятная история, потому что у нас большое число писателей, и у этих писателей совершенно разные профили нагрузки, которые нам не подконтрольны. 

Люди могут совершенно по-разному писать метрики, логи и трейсы, и с этим очень трудно бороться. А еще на нас сильно влияют массовые отказы. Например, в каком-нибудь дата-центре какой-нибудь луч питания отключился, и куча приложений, которые связаны с этой частью инфраструктуры, начинают писать больше ошибок. Как следствие, у нас в системе начинают появляться спайки. 

Важно понимать, что Sage занимает не больше 10% инфраструктуры T-Банка, а это означает, что большой T-Банк пытается завалить телеметрией маленький Sage. 

Для решения проблемы больших нагрузок мы много чего делаем, но в первую очередь изучаем входящий поток. Для примера — поток логов.

Входящий поток логов

Входящий поток логов

Если посмотреть на размах колебаний, получается, что это примерно 50% от того рейта, который мы в среднем процессим внутри Sage, — это огромная разница. 

Наверное, можно было бы разделить клиентов на какие-то группы и таким образом как-то выровнять рейт, но есть схема получше — Queue-based Load Leveling Pattern.

Queue-based Load Leveling Pattern

Queue-based Load Leveling Pattern

Если у нас есть входящий рейт, мы добавляем посередине буфер, который накапливает в себе данные, и этот буфер вычитываем с каким-то удобным для нас рейтом. В нашей архитектуре такую задачу выполняет Kafka. Под каждого клиента есть отдельные топики — очереди, и мы сглаживаем разные профили и читаем с той скоростью, с которой готовы принять эти данные. 

Pull-модель получения данных

Pull-модель получения данных

Самое слабое место в пайплайне записи — хранилище, нам нужно его защитить. Поэтому мы используем pull-модель, которая позволяет вычитывать данные с нужной скоростью с учетом возможностей хранилища.

Надежность воркеров. Какими бы надежными ни были готовые компоненты, свои компоненты тоже должны быть очень надежными, потому что там возникает куча своих проблем.

Например, консюмеры, которые читают сообщения с Kafka, могут частично отказывать, или очереди могут не успевать обрабатываться, потому что изменился профиль у клиента. 

У клиентов разный поток, и потоки могут в десятки раз отличаться друг от друга, поэтому мы используем Competing Consumers Pattern. 

Сompeting Consumers Pattern

Сompeting Consumers Pattern

Есть очередь, мы подключаем к ней большой набор консюмеров. Каждый консюмер имеет запас производительности, поэтому мы не боимся частичного отказа консюмеров и будем продолжать обрабатывать данные, даже если был какой-то сбой и часть инфраструктуры пострадала. А клиенты при этом даже не заметят, что у нас внутри что-то происходит.

Исходя из этого паттерна, мы построили воркеры следующим образом: 

На каждом инстансе есть отдельный воркер, который состоит из множества джобов. Каждый джоб обслуживает отдельного клиента и состоит из трех слоев. Это консюмер, который читает и обрабатывает данные, потом идет диспетчер, который решает, куда эти данные надо записать, и DBClient, который занимается записью данных в хранилище. 

Миллионы запросов. В нас летит 3 млн ивентов в секунду. Если в лоб решить задачу, получится 3 млн запросов в БД. Ни одной БД это не понравится. А еще разработчики хранилища рекомендуют снижать рейт на запись, потому что это часто вызывает большую деградацию производительности. Для решения проблемы мы выбрали батч-процессинг. 

Batch Processing

Batch Processing

Консюмер вычитывает сообщения из очереди и набирает буфер по Thresholds на количество сообщений, общий размер буфера, время на набор этого буфера. После чего целиком отправляет этот буфер в хранилище.

В Elasticsearch есть специальный bulk API для этого, в ClickHouse — рекомендация за раз писать по 100 тысяч строк. На метриках такой проблемы нет, потому что когда мы собираем с таргетов метрики, они уже пачками нам прилетают, и чаще приходится, наоборот, эти батчи поменьше делать, чтобы не 100 МБ сразу в БД записывать. 

Повторная запись. Как только мы доходим до записи, рано или поздно возникает проблема дублей. Самый простой способ получить дубль — прочитать сообщение, записать его в хранилище и не закоммитить оффсет в Kafka. Хотя эти события не очень частые, но с этим все равно нужно бороться.

Есть два варианта борьбы с дублями:

  1. По классике. Поднять отдельную БД, сохранять там оффсеты и проверять, что мы эти данные уже процессили. Но нам не подходит такой вариант, потому что тогда миллионы запросов прилетят в эту БД. 

  1. Обогатить данные каким-то идентификатором и проверять с его помощью данные на уникальность. Но эта проверка увеличивает нагрузку на основное хранилище.

Мы используем второй вариант. Для этого в случае логов мы генерируем идентификатор для каждой записи. В трейсинге берем готовые идентификаторы из спанов. Для метрик идентификатором выступает вся Time Series. 

Для проверки на уникальность в Elasticsearch и VictoriaMetrics мы используем штатные фичи: Unique Constraint и отложенную дедупликацию соответственно. А в трейсинге мы сами дедуплицируем данные при поиске — просто выкидываем дубли из общего ответа, если они есть.

Много данных. Рано или поздно с ростом системы все сталкиваются с проблемой большого количества данных. Классическая история, когда данные могут не влазить в сервер. БД может не иметь кластерной версии, как долго было у VictoriaMetrics. Еще кластерная версия может быть неудобной для обслуживания или кластер становится слишком большим и по своим характеристикам просто не вмещает тот объем данных, который нужно в нем хранить. 

Если у вас один кластер, это серьезная проблема. Потому что отказ одного кластера — это полная потеря данных. Также я добавил бы сюда особые требования к хранению, безопасности, может быть, комплаенс. У разных клиентов могут быть такие штуки.

Схема шардирования

Схема шардирования

Шардирование — отличная техника для решения проблем с большим количеством данных. Обычно разработчики доверяют эту задачу самой БД. У многих БД есть отличные встроенные механизмы. 

Мы используем гибридную схему: часть данных шардируем средствами БД для оптимизации работы хранилища, а остальное шардируем на клиенте и сами решаем, на какую ноду или целый кластер данные записать. Так мы горизонтально масштабируем хранилище до неимоверных размеров. У нас только логов лежит под 7 ПБ.

Шардирование позволяет сделать первый шаг к мультитенантности и разделить клиентов по разным хранилищам. 

Доступность это проблема, когда может происходить куча всего: отказ оборудования, вывод оборудования на обслуживание, замена батареек или плашек памяти. Есть еще не очень очевидная, но тоже связанная с доступностью данных проблема — охлаждение данных, потому что мы не все можем держать в горячем хранилище. Нужно строить даты-тиры, где данные будут потихонечку остывать и дольше храниться при этом. 

Для решения мы используем репликацию данных.

Схема репликации

Схема репликации

У нас работает гибридная схема. Мы используем репликацию внутри хранилища, если есть такая возможность, чтобы поднять надежность самого кластера. А также иногда делаем репликацию еще на стороне клиента. 

Например, так мы сделали долгосрочное хранение для метрик. У нас два типа кластеров: краткосрочное хранение и долгосрочное хранение. Мы льем туда одинаковый поток, но при этом под этими кластерами разное железо, у них разный профиль поисковой нагрузки и разные конфигурации самих кластеров. Эта штука прекрасно работает, и могу сказать, что репликация на клиенте — это окей. 

Если хочется больше узнать о том, как мы работаем с данными, как выглядит наше хранилище изнутри, как мы конфигурируем, в частности, Elasticsearch, мой коллега Рома Николаев рассказывал эту историю. 

7 петабайт логов в Elastic
Всем привет, меня зовут Роман. В ИТ я больше 15 лет — начинал как системный администратор, сейчас SR…

habr.com

Один отказ может убить всю систему. Надо признать, что отказы — это норма. К этому надо быть готовым, с этим можно спокойно жить и вообще никак не переживать. 

Но хочется управлять влиянием отказов на систему и быть готовым к этим нештатным ситуациям. А еще хочется иметь возможность экстренного изменения состояния продакшена. Наверное, любой SRE скажет, что рано или поздно надо где-то что-то руками резко стопать и такая возможность должна быть. Пайплайн не должен от этого разваливаться. 

В судостроении придумали такую технику — добавлять перегородки на дно корабля, чтобы одна маленькая пробоина не потопила весь огромный корабль. В программировании это называется Bulkhead Pattern.

Мы выбираем ресурс в системе, например ноды или кластер хранилища. И добавляем Concurrency Limiter, который этот ресурс защищает. 

Concurrency Limiters очень чуткие, по сути, это семафоры. Они контролируют количество параллельных реквестов, которые летят в эту БД. 

Могу сказать, что такой Concurrency Limiter работает в разы быстрее, чем любой Rate Limiter. Изменение Response Time мгновенно забивает Concurrency Limiter, и запись останавливается. 

Реализация Bulkhead Pattern с помощью Concurrency Limiters

Реализация Bulkhead Pattern с помощью Concurrency Limiters

Ответы с ошибкой при записи в БД это норма до какой-то степени, они регулярно случаются. Причин может быть целый вагон: проблемы с сетью, сервер сломан или перегружен, сломалась сама БД.

Хорошая стратегия — поретраить, отправить еще один запрос. В целом я согласен, но есть пара деталей. 

Как делать правильные ретраи:

  1. Настроить экспоненциальную задержку между попытками.

  2. Ограничить количество попыток.

  3. Добавить джиттер, чтобы аккуратненько размазывать ретраи.

Специалисты из AWS провели исследование. Оказалось, что такая стратегия все равно собирает спайки и для крупных систем это становится критическим. Представьте, БД у вас очухалась, вы начали туда писать, снова прилетает такой спайк и опять ее заваливает. Опять БД не работает. Проблема. 

Мы поисследовали, в каких случаях и что происходит. Для пайплайна записи есть два кейса. Первый: все хорошо, но что-то моргнуло, можно ретраить безопасно, ничего страшного не произойдет. Вторая ситуация, когда БД отказывает. Она либо сильно деградирует, либо просто ее больше нет. Выключилась, серверы сгорели и так далее. Тогда не ретраить смысла нет. 

Ретраи — очень опасная штука. Они повышают Concurrency на БД, что может ее убить, поэтому за ретраями надо очень четко следить. 

Наш рецепт такой: ретраим, но один раз, без задержек. В надежде на то, что это просто что-то моргнуло. Для оптимизации инфраструктуры у нас в код встроены балансеры. Поэтому, если есть возможность, мы просто меняем ноду и отправляем в следующую. 

Такой подход радикально увеличивает количество успешных записей. 

Много ответов с ошибками. Наши воркеры — это множество независимых джобов под каждого клиента. Параллельно эти джобы отправляют реквесты, и, допустим, все они выбрали одно хранилище. Представим, что у одного ошибка, у второго, у третьего. Хочется перестать слать эти падающие запросы в БД, остановиться и отловить. 

Для этого используется паттерн Circuit Bbreaker. Это такая штука, которая добавляется между реквестом и хранилищем в коде. Паттерн контролирует количество параллельно происходящих ошибок. Если какой-то Tthreshold переполняется, мы просто его размыкаем и реквесты мгновенно получают ответ: cорян, сейчас записать не получится, ищите другой способ. 

Таймауты есть повсюду: во фреймворках, библиотеках, приложениях. Мы сами постоянно их добавляем, и у них множество имен. Это какие-то задержки, какие-нибудь Max Poll Interval, просто интервал, Duration — миллион имен, но все это таймауты. 

В нашем случае таймауты собираются в воронку. Ее можно назвать бюджетом времени на исполнение. 

Мы читаем сообщение из Kafka, у консюмера есть обязательный параметр Max Poll Interval. Kafka требует от нас раз в какое-то время запросить сообщение, иначе она подумает, что мы померли или зависли, и отключит нас. Это задает верхнюю границу времени для обработки сообщения.

Все следующие шаги — это ожидание свободного слота в Concurrency Limiter, задержка между ретраями, максимальное время ожидания ответа от БД. Представьте, взяли 30 секунд просто на ожидание от общего процессинга и съели. И даже время процессинга внутри самой БД тоже ест этот бюджет. 

Рекомендация простая. Когда мы пишем код и хотим добавить новую задержку, новые таймауты, еще что-то, надо внимательно согласовать его со всеми остальными таймаутами в пайплайне, чтобы не вылезти за бюджет и чтобы нам хватило времени на процессинг.

План Б, когда паттерны не спасают

Я рассказал о полезных паттернах, которые позволяют решать множество проблем. Но всегда есть ситуация, когда что-то пошло не так и у нас не хватило попыток на запись, кончилось время, Concurrency Limiter сказал, что нет слотов. 

Для таких ситуаций нужен план Б. Есть такой специальный механизм — называется Fallback. Переводится как «резервный план или процесс». В качестве резервного плана можно сделать следующее. 

Отбросить данные. Ни один пользователь, который шлет в нас телеметрию, не захочет, чтобы мы удаляли их метрики или трейсы. Они же им нужны. 

Остановиться, если есть какая-то ошибка. Получили ошибки — остановились, ничего не пишем. База очухалась — начали дальше в нее писать. Вроде хорошо. Если надо надолго остановиться, у нас в Kafka накапливаются буферы. В конце концов, место закончится и мы снова начнем терять данные, поэтому такая стратегия не очень подходит. 

Можно попробовать неблокирующие ретраи звучит многообещающе, как по мне. 

А еще есть такая штука, как Dead Letters Queue? — это перекладывание данных в специальное место, чтобы мы потом разобрались, что с ними делать, почему они там оказались. 

Мы используем последние два способа. Давайте смотреть, как это все работает вместе. 

Общая схема обработки сообщения при использовании неблокирующих ретраев и Dead Letters Queue

Общая схема обработки сообщения при использовании неблокирующих ретраев и Dead Letters Queue

Есть какая-то Source Qqueue в Kafka, мы вычитываем данные оттуда, проверяем, хорошие они или нет: биты, не биты, соответствуют формату и так далее. После этого, если они не соответствуют, складываем их в Error Queue (Dead Letter Qqueue). А если они хорошие, пишем в базу. 

Если по какой-то причине в базу записать не получилось, мы перекладываем эти данные в специальную Retry Queue для отложенной обработки.

Схема отложенной обработки сообщения

Схема отложенной обработки сообщения

Отдельным процессом вычитываем данные из Retry Queue специальным fallback-консюмером и складываем в БД. Если БД недоступна, пробуем еще раз и еще. Тут уже совсем другие политики работают. Если по какой-то причине БД сказала, что обработать такой тип реквеста мы не можем, мы тоже складываем эти данные в специальную Error Queue, которую потом руками разбираем и что-то фиксим. Потому что другой причины здесь быть не может. 

Особенности fallback-консюмеров: 

  • Они не могут писать с той же скоростью, с которой пишут основные консюмеры. Помните: Concurrency убивает БД. Если мы долго хранили данные в очереди для отложенной обработки, а потом взяли и начали параллельно писать, получится 2× нагрузки — это очень много, БД может не справиться. Надо жестко зажимать консюмеры. Неважно, что исторические данные будут откачиваться подольше. Главное, что свежая телеметрия станет быстрее доступна для клиентов.

  • Каждая дополнительная очередь создает копию данных. То есть, грубо говоря, у вас в Source Queue лежит сообщение плюс реплика. Потом, когда мы переложили в Retry Queue, это опять еще одна копия сообщения, еще одна реплика. И Error Queue — это еще одно сообщение, еще одна реплика. Надо четко считать Capacity, понимать, сколько мы в таком режиме можем продержаться, пока Kafka не начнет дропать данные по Retention. 

Особая история есть про метрики.

Первая версия пайплайна записи метрик

Первая версия пайплайна записи метрик

Изначально пайплайн создавался как замена Prometheus, чисто под скрейпинг. Мы что-то читали с таргетов, потом складывали в Kafka, процессили в Metrics Worker, записывали в БД. У нас сначала даже VictoriaMetrics были в сингл-версиях, не кластерные. И все было хорошо, пока поток не начал расти и инстансы VictoriaMetrics не стали отказывать чаще.

В это же время начали заезжать крупные клиенты, огромные кластеры кубернетиса, которые пишут метрики по протоколу Prometheus Remote Write, что еще сильнее нагрузило пайплайн и БД.

Начали разбираться, что с этим делать, и выяснилось, что Prometheus Remote Write по объему данных оставил скрейпинг далеко позади. Мы изучили спецификацию протокола и нашли рекомендацию о том, чтобы на стороне писателей были специальные буферы для того, чтобы переживать кратковременные отказы. 

Мы подумали: круто, хорошая рекомендация. Проверили, кто в нас чаще всего пишет. Это Prometheus и VM-агенты. Проверили их реализацию. Оказалось, у них там есть действительно внутренние буферы, даже с персистом на диске. И есть воркеры, которые читают буферы и процессят. 

Ничего не напоминает? Kafka, буферы, воркеры. У них есть то же самое, что и у нас. Прекрасно, подумали мы и пустили этот поток напрямую. 

Добавили в пайплайн записи метрик прямую запись в БД для части потока

Добавили в пайплайн записи метрик прямую запись в БД для части потока

Все хорошо работало до первого крупного отказа на VictoriaMetrics. Стали теряться метрики, потому что буферы на стороне клиентов оказались маленькие. Они готовы принимать и держать какой-то объем метрик, но, допустим, этого хватает на несколько минут. А если клиент большой, а буферы у него недостаточно большие для такого размера, они вообще переполняются за какие-то секунды. 

Вторая проблема: как только база восстанавливается, запись начинает идти, клиенты нам с удовольствием возвращают долги — резко отдают данные, и получается огромный входящий рейт. А я напомню, Concurrency — это проблема. Боремся с ней. 

Подумали и сделали так: весь основной поток пытаемся писать напрямую. Таким образом, с минимальными задержками свежая телеметрия попадает в БД и доступна для поиска. Всем же хочется видеть свежие дашборды, актуальные алерты на свежих данных и так далее.

Финальный вариант пайплайна записи метрик: прямая запись в БД стала использоваться по умолчанию

Финальный вариант пайплайна записи метрик: прямая запись в БД стала использоваться по умолчанию

Если что-то стреляет на VictoriaMetrics, случается какая-то деградация, мы сразу переключаемся на fallback-механизм и начинаем скидывать данные в Kafka для отложенной записи. 

Но и когда с VictoriaMetrics все хорошо, какое-то количество данных все равно проходит через Kafka для сглаживания пиков входящего потока. Обычно пики небольшие и обработка таких данных занимает максимум пару сотен миллисекунд. В случае же серьезных пиков время обработки вырастет, но это радикально повышает надежность пайплайна. 

Антипаттерн «Шумный сосед»

Наверняка каждый из вас помнит историю, когда сосед по лестничной клетке купил себе новый перфоратор и через стенку сообщает о своей покупке. Для нас тоже есть такие шумные соседи. 

Когда мы делаем коммунальные системы, мы экономим за счет того, что шарим ресурсы между клиентами. Но при этом клиенты могут быть шумными, то есть забирают на себя больше ресурсов, чем мы ожидаем. Причина может быть в размере потока, его содержимом, профиле записи.

Я предлагаю схему из четырех шагов, чтобы с этим бороться: 

  1. Надо уметь мониторить поток клиентов независимо друг от друга. Нужно видеть, кто сколько льет и что у него поменялось. 

  2. Надо уметь изолировать клиентов друг от друга. И так можно будет складывать токсичный трафик на отдельные кластеры. 

  3. Обязательно надо добавлять какой-то механизм квотирования, троттлинга, рейт лимитеров в разные части пайплайна, которые будут контролировать поток и как-то поджимать клиентов независимо друг от друга. 

  4. Самое неприятное, но это необходимость для выживания систем. Нужно иметь рубильник для того, чтобы чей-то поток просто остановить. Лучше потерять данные одного клиента, чем потерять весь пайплайн. К сожалению, такова жизнь. 

Итоги

Вот мы и добрались до финала, до развязки. Предлагаю пройтись по всему материалу, закрепить его и разобрать два сценария. Когда все хорошо и когда все плохо. 

Начнем со сценария, когда все хорошо. Мы берем данные из основной очереди, читаем данные консюмером, pull-моделью. Потом batch-процессор пытается набрать нам Batch. После диспетчер пытается определить, куда же нам надо записать эти данные. Например, по какому-нибудь идентификатору клиента. Потом Bulkhead Pattern, то есть Concurrency Limiter, проверяет, доступны ли у него слоты для записи. После этого мы проверяем, кончились ли попытки для записи. Они у нас есть, в базе же все хорошо. Circuit Breaker замкнут, соответственно, реквест проходит дальше. Таймаут еще не стрельнул, то есть общее время не закончилось. 

Ура, друзья, мы справились, записали данные в хранилище.

Второй кейс не такой прикольный, потому что базе поплохело. И у нее по-другому все работает: прочитали данные, собрали батч, определили больное хранилище для записи. Дальше либо у нас кончатся слоты в Concurrency Limiter и это произойдет практически мгновенно, за какие-то миллисекунды. Либо у нас закончатся попытки на запись, на 100% они закончатся. Сколько их ни делай, если база померла, все равно закончатся. Circuit Breaker разомкнется, когда поймает много ошибок. Или у нас может просто закончиться время на выполнение, пока мы все эти машинерии делали. 

Во всех этих случаях мы идем нашим fallback-механизмом, складываем данные в Retry Queue и потом уже отложенно их процессим, как сможем.

Паттерны делятся на несколько групп, часть из них формирует архитектуру развертывания системы, другие влияют на ее поведение и возможности, некоторые направлены на обработку ошибок и разных нештатных ситуаций. Работая вместе, они дополняют друг друга, каждому паттерну есть свое определенное место в пайплайне, нельзя просто так менять их порядок. 

Все эти механизмы мы делали итеративно по мере поступления проблем, не сильно задумываясь, паттерны это или велосипеды. Было интересно оглянуться назад и осознать, что это все же паттерны чуть менее чем полностью. Можно ли было сразу посмотреть в каталог и набрать нужных компонент архитектуры пайплайна записи, как в тележку супермаркета? Наверное. Во всяком случае, теперь такая возможность есть, все ссылки на паттерны — ниже. Мы шли к результату чуть дольше, но путь был интересным и полным приключений.

Материалы к докладу


ссылка на оригинал статьи https://habr.com/ru/articles/858602/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *