Работа с потоком логов в реальном времени с помощью Heka. Опыт Яндекс.Денег

от автора

image alt text

В этом материале я расскажу о том, как в Яндекс.Деньгах организована система сбора и доставки серверных логов платежных сервисов. Доставкой логов я занимаюсь весь последний год, и за это время накопилось достаточно нюансов, чтобы поделиться опытом с общественностью.

Система построена на базе стека EHK (Elasticsearch/Heka/Kibana), с прицелом на работу практически в реальном времени. Особый упор сделаю на тонкие места и нюансы обработки миллиардов строк текста в сутки.

Я верю в три вещи: мониторинг, логи и бэкапы.

Для быстрого доступа к оперативным логам в Яндекс.Деньгах используется ElasticSearch, получающий данные из почти сотни микросервисов, которые и составляют платежный сервис.

Под «оперативными» я подразумеваю логи, доступные в реальном времени: задержка доставки новых записей от файла лога до кластера Elasticsearch сейчас составляет менее секунды. Служба мониторинга всегда точно знает состояние сервиса в любой момент, а разработчики могут оперативно оценивать и корректировать поведение новой версии каждого микросервиса сразу после релиза.

Но так было не всегда. Когда пять месяцев назад я пришел в компанию, мне поставили задачу наладить работу по доставке оперативных логов сервиса до кластера ElasticSearch (далее просто ES). На тот момент использовались четыре схемы их разбора и доставки до ES:

  • Heka –> TCP output –> Heka –> ES.

  • Heka –> Apache Kafka –> Apache Flume –> ES.

  • SyslogNg –> ES.

  • nxlog –> nxlog –> ES.

Почти все они работали неидеально: кластер Kafka был ненадежен, Flume периодически вис, отчего ES впадал в ступор.

Логи – это на самом деле очень простая вещь: множество файлов, которые на боевых серверах быстро растут в объемах. Поэтому простые решения в этом случае — самые надежные.

Так как «чем больше в зонтике сочленений — тем выше вероятность его поломки», то я выбросил все неработающие и неоправданно сложные схемы и остановился на одной, в которой роль обработчика и транспорта выполняла бы Heka.

image alt text

Heka забирает логи и по протоколу TCP отправляет их в другую Heka для дальнейшей пересылки в ElasticSearch Cluster.

Идея следующая: на каждом сервере установлена Heka, логи всех сервисов этого сервера упаковываются в бинарный протокол Protobuf и отправляются на приемную Heka для разбора, фильтрации и отправки в ES.

Почему не классический стек ELK и почему deprecated Heka, а не Logstash

С точки зрения конфигурации и состава ПО наш кластер ES выглядит следующим образом:

  • ElasticSearch 2.2.

  • Kibana 4.4.2.

  • Две мастер-ноды: Intel Xeon 2x E5-2660, 64 ГБ ОЗУ, 2x 146 ГБ RAID-10.

  • Клиентская нода с установленной Kibana: Intel Xeon 2xE5-2660, 64 ГБ ОЗУ, 2×146 ГБ RAID-10.

  • Четыре дата-ноды: Intel Xeon 2x E5-2640 v3; 512 ГБ ОЗУ, 3×16 ТБ RAID-10.

Все ноды ES находятся в одной локальной сети и подключены к одному маршрутизатору, что позволяет осуществлять общение внутри кластера по транспортному протоколу на максимальной скорости. Такая организация заметно ускоряет размещение индексов и rebalance кластера.

В современном мире стек Elasticsearch/Logstash/Kibana стал уже практически стандартом де-факто для работы с логами. И если против Elasticsearch и Kibana нет никаких возражений, то с Logstash есть один нюанс — он создан на jRuby (написанный на Java интерпретатор языка Ruby) и требует наличия в системе JVM. Учитывая, что Яндекс.Деньги – это множество микросервисов, размещенных на сотнях физических серверов и виртуальных контейнеров, ставить в каждый из них тяжеловесные Logstash и JAVA было бы неправильно. На Heka выбор пал из-за ее простоты, надежности, легкости, умения фильтровать проходящие сообщения и отличной буферизации данных.

Что касается статуса продукта (deprecated) – для нас это не аргумент. Лук и стрелы для военных целей тоже deprecated, но это никак не мешает вам выстрелить кому-нибудь в голову с гарантированным результатом. Если же понадобится, например, нестандартный плагин или обработчик, то доработать продукт поможет целый штат опытных разработчиков.

Но это всё была теория, а при переходе к практике начались проблемы.

Дьявол прятался в объеме данных

Учитывая финансовую специфику нашей работы, почти все сервисы пишут много разной информации в логи. Для примера и понимания масштаба: объем логов некоторых из компонентов системы доходит до 250 тысяч строк в минуту.

Ни одна Heka, на каком бы мощном железе она ни стояла, в одиночку такой объём не обработает — неизбежны проседания производительности и потеря данных. Ни то, ни другое, конечно же, совершенно недопустимо, поэтому на помощь приходит HAProxy. Итоговая схема получилась следующей:

image alt text

На схеме изображено общее направление трафика логов от Heka-источников до связки HAProxy + Heka.

На каждом сервере стоит одна Heka, собирающая логи микросервисов этого сервера. Данные собираются, пакуются в Protobuf и по TCP уходят на балансировщик нагрузки, обслуживающий дата-центр. Бэкендами работают HAProxy, расположенные непосредственно на дата-нодах ES, за которыми стоят пулы Heka. В свою очередь, они принимают данные, переупаковывают их в ESJson и отправляют в локальную дата-ноду по протоколу HTTP.

… и в разных форматах лог-файлов

Несмотря на то, что основным языком в компании является Java и логи выводятся через стандартную библиотеку log4j, не было единого принятого формата на момент постройки «кластера мечты». Каждый микросервис писал логи собственного типа, включая вариации в наборе выводимых полей и форматах даты-времени. Не хотелось ждать, пока разработка приведёт логи к общим форматам, поэтому движение к цели было параллельным. Одновременно с разбором существующих форматов логов заводились задачи на доработку, а по мере релиза новых версий с правильными форматами менялись и настройки сборщиков.

Перейдем к самому соку — нюансам настройки

Поведение Heka описывается .toml файлами, которые при запуске собираются в единую конфигурацию, и в зависимости от описанных блоков Heka строит модель обработки данных.

Каждый блок проходит через несколько этапов:

  • Input — входной поток (это может быть файл, TCP/UDP input, данные, прочитанные из Kafka, событий Docker контейнеров и т.п.).

  • Splitter — тут мы указываем начало и конец каждого блока данных в потоке. Обязательный этап для многострочных данных вроде Java Stacktrace.

  • Decoder — описывает правила декодирования входящих данных. Мы в основном используем Regex декодеры.

  • Filters — этап фильтрации и изменения данных.

  • Encoders — кодирование потока данных под формат получателя.

  • Output — здесь мы описываем, как и куда данные должны отправиться.

Все эти этапы — просто плагины на Go и Lua. При необходимости можно написать что-нибудь своё. Например, плагин-фильтр на Lua, который будет отсекать отправку в ES записей о мониторинговых запросах к сервису; или вырезать из логов конфиденциальные данные.

Хека в древнеегипетской мифологии — бог магии. И то, что Heka позволяет делать с логами — просто волшебно.

Параметры сервера-источника логов

Разберем конфигурацию Heka на примере сервера-источника логов и файла service.toml.

[money-service-log] type = "LogstreamerInput" log_directory = "/var/log/tomcat" file_match = "money-service.log" splitter = "RegexSplitter" decoder = "service_decoder"

Простейший случай — один файл, ротация происходит системными средствами. Если файлов много и они различаются по форматам, то следует описывать каждый парой input/decoder. За более детальным описанием лучше обратиться к официальной документации. Если что-то остается непонятным — обязательно спрашивайте в комментариях.

[RegexSplitter] delimiter = '\n(\[\d\d\d\d-\d\d-\d\d)' delimiter_eol = false

Так как логи могут быть многострочными (те же stacktraces), не забываем про RegexSplitter, который даёт Heka понять, где заканчивается один блок текста и начинается другой.

[service_decoder] type = "PayloadRegexDecoder" match_regex = '^\[(?P<timestamp>\d{4}-\d{2}-\d{2}T[\d:.\+]+\])\s+(?P<level>[A-Z]+)\s+\[(?P<thread>.*)\]\s+\[(?P<context>\S*)\]\s+\[(?P<traceid>\S*)\]\s+\[(?P<unilabel>\S*)\]\s\[(?P<class>\S+)\]\s-\s(?P<msg>.*)'  log_errors = true     [service_decoder.message_fields]     @timestamp = "%timestamp%"     level = "%level%"     thread = "%thread%"     context = "%context%"     traceid = "%traceid%"     unilabel = "%unilabel%"     class = "%class%"     msg = "%msg%"

В match_regex описываем строки лога регулярным выражением в стандарте языка Go. Регулярные выражения в Go практически совпадают со стандартным PCRE, но есть ряд нюансов, из-за которых Heka может отказаться стартовать. Например, некоторые реализации PCRE простят такой синтаксис:

(?<groupname>.*)

А вот GOLANG — не простит.

С помощью параметра log_errors собираем все ошибки в отдельный лог — они понадобятся позже.

[ServiceOutput] type = "TcpOutput" address = "loadbalancer.server.address:port" keep_alive = true message_matcher = "Logger == 'money-appname'" use_buffering = true      [ServiceOutput.buffering]     max_file_size = 100857600     max_buffer_size = 1073741824     full_action = "block"

Буферизация выходящего потока — одна из отличных возможностей Heka. По умолчанию она хранит выходной буфер по следующему пути:

/var/cache/hekad/output_queue/OutputName

В настройках мы ограничиваем размер каждого файла буфера объемом 100 МБ, а также устанавливаем суммарный размер кеша для каждого Output-модуля в 1 ГБ. Параметр full_action может принимать три значения:

  • shutdown — при переполнении буфера Heka останавливается;

  • drop — при переполнении буфера он начинает работать как стек, удаляя старые сообщения в очереди;

  • block — при заполнении буфера Heka приостанавливает все операции и ждёт, пока не появится возможность отправить данные.

С block вы гарантированно не потеряете ни одной строки лога. Единственный минус в том, что при обрыве соединения или деградации канала вы получите резкий скачок трафика при возобновлении связи. Это связано с тем, что Heka отправляет накопленный буфер, пытаясь вернуться к обработке в реальном времени. Приёмный пул нужно проектировать с запасом, обязательно учитывая возможность таких ситуаций, иначе можно с лёгкостью провернуть DDoS самих себя.

Кстати, по поводу использования двойных и одинарных кавычек в конфигурации Heka — подразумевается следующее:

  • Значения переменных в одинарных кавычках по умолчанию рассматриваются как raw string. Спецсимволы экранировать не нужно.

  • Значения переменных в двойных кавычках рассматриваются как обычный текст и требуют двойного экранирования при использовании в них регулярных выражений.

Этот нюанс в свое время попортил мне немало крови.

Конфигурация бэкендa

Бэкенд для балансировщика — это связка из HAProxy и трёх экземпляров Heka на каждой дата-ноде ES.

В HAProxy всё довольно просто и пояснений, как мне кажется, не требует:

listen pool_502     bind 0.0.0.0:502     balance roundrobin     default-server fall 5 inter 5000 weight 10     server heka_1502 127.0.0.1:1502 check     server heka_2502 127.0.0.1:2502 check     server heka_3502 127.0.0.1:3502 check

На каждом сервере запущено три инстанса Heka, отличающихся только портами:

[Input_502_1] type = "TcpInput" address = "0.0.0.0:1502" keep_alive = true keep_alive_period = 180 decoder = "MultiDecoder_502"  [MultiDecoder_502] type = "MultiDecoder" subs = ['Service1Decoder', 'Service2Decoder'] cascade_strategy = "first-wins" log_sub_errors = true

В конфигурации используется MultiDecoder, так как через один порт проходят логи многих сервисов. Политика first-wins означает, что после первого совпадения дальнейший перебор декодеров прекращается.

[Service1Decoder] type = "ProtobufDecoder"  [Service2Decoder] type = "ProtobufDecoder"  [Service1Encoder] type = "ESJsonEncoder" index = "service1-logs-%{%Y.%m.%d}" es_index_from_timestamp = false type_name = "%{Logger}" fields = [ "DynamicFields", "Payload", "Hostname" ] dynamic_fields = ["@timestamp", "level", "thread", "context", "traceid", "unilabel", "class", "msg"]         [Service1Encoder.field_mappings]         Payload = "message"         Hostname = "MessageSourceAddress"

Параметр es_index_from_timestamp нужен для указания, что дата и время для формирования имени индекса берутся не из приходящих данных, а из локального времени сервера. Позволяет избежать бардака, когда серверы работают в разных временных зонах и кто-то пишет в логи время в UTC, а кто-то в MSK.

В параметре index реализуется принцип «один сервис – один индекс», новый индекс создается каждые сутки.

[Service1Output] type = "ElasticSearchOutput" message_matcher = "Logger == 'money-service1''" server = "http://localhost:9200" encoder = "Service1Encoder" use_buffering = true

Плагины Output разбирают поток данных на основании параметра message_matcher, соответствующего имени файла лога. Чтобы не перегружать сеть, Heka отправляет данные в локальную дата-ноду, на которой установлена. А уже дальше ES сам рассылает индексированные данные по транспортному протоколу между дата-нодами кластера.

Заключение

Описанная выше схема успешно работает и индексирует по 25-30 тысяч записей в секунду. Запас прочности приемных пулов Heka позволяет выдерживать пики нагрузки до 100 тыс. записей/сек:

image alt text

Статистика из Zabbix.

Мы храним оперативные данные за последний 21 день. Опыт показывает, что к более старым данным оперативный доступ нужен крайне редко. Но если понадобится, их всегда можно запросить с лог-серверов, хранящих данные практически вечно.

image alt text

Текущее состояние кластера по данным Kopf.

Я описал только ту часть системы, которая относится к сбору и доставке логов, поэтому в следующей статье собираюсь рассказать про сам кластер ElasticSearch и его настройку. Думаю рассказать, как мы его виртуализировали, как переезжали с версии 2.2 на 5.3 и перевозили с собой 24 миллиарда записей, не потеряв при этом веру в человечество.

Может быть, добавить к рассказу что-то еще, какие-то упущенные из виду детали? Поделитесь мнением в комментариях.

ссылка на оригинал статьи https://habrahabr.ru/post/328018/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *