Apache NiFi. Быстрый доступ к логам

от автора

Разработчики потоков обработки данных на сервисе Apache NiFi знают, что все интересующие события процессоров можно вывести в интерфейс в раздел «Bulletin Board». Однако, если вы в текущий момент не следите за интерфейсом, увидеть сообщение становится невозможным.

Apache NiFi хранит логи (по умолчанию) в папке «./logs». Процессоры пишут свои события в файл «nifi-app.log». И если у вас есть доступ к каталогу с логами, то никаких трудностей нет — открываем файл, читаем, фиксим поток. А если у вас нет доступа? Или есть желание оперативно получить сведения о событии, чтобы понять контекст его возникновения? Ситуации бывают разные. Рассмотрим простой способ получить информацию из лога.

Иные варианты получения данных из логов
  1. Применяем filebeat — ставим, настраиваем на папку, шлем логи в ELK.

  2. Настраиваем logback.xml — применяем логгер, отправляющий данные по сети на LogStash, например стандартный или кастомный.

Для чтения файла лога воспользуемся процессором TailFile, далее конвертируем полученные данные, разобьём на отдельные записи, извлечем данные в атрибуты и отправим себе в канал Slack:

Общий вид потока, читающего лог
Общий вид потока, читающего лог

Настройки TailFile — читаем один файл, указываем его расположение, начальную позицию и периодичность:

Параметры процессора TailFile
Параметры процессора TailFile

Далее преобразуем полученные данные с помощью ConvertRecord:

Параметры ConvertRecord
Параметры ConvertRecord

Чтение данных выполняется с помощью GrokReader, позволяющего структурировать текстовые данные. О Grok есть много публикаций, например тут от @chemtech.

Параметры GrokReader
Параметры GrokReader

В настройках указываем шаблон, и выставляем параметр, добавляющий данные к предыдущему сообщению, если они не подходят под шаблон.

Структуре лога соответствует выражение:

%{TIMESTAMP_ISO8601:date} %{LOGLEVEL:Level} \[%{DATA:thread}] %{DATA:logger} %{GREEDYDATA:message}

В итоге имеем Json вида:

{    "date" : "2022-06-01 23:34:02,905",   "Level" : "INFO",   "thread" : "pool-10-thread-1",   "logger" : "o.a.n.c.r.WriteAheadFlowFileRepository",   "message" : "Initiating checkpoint of FlowFile Repository",   "stackTrace" : null,   "_raw" : "ds (Stop-the-world time = 2 milliseconds, ..." }

Извлекаем данные из контента. в атрибуты с помощью EvaluateJsonPath:

Параметры EvaluateJsonPath
Параметры EvaluateJsonPath

Далее шлем сообщения в канал Slack:

Разбиваем сообщения по уровню, и отправляем в Slack
Разбиваем сообщения по уровню, и отправляем в Slack

Пример отправки в Slack:

Добавляю имя хоста, чтобы понимать, с какого инстанса идет сообщение
Добавляю имя хоста, чтобы понимать, с какого инстанса идет сообщение

В заключение скажу, что этот подход не является моей личной разработкой, известен давно, и можно найти различные примеры потоков Nifi, читающих логи. Как вариант, можно слать логи не в Slack, а в телеграмм или Elastic.

Такая структура не является панацеей, и не претендует на уровень production. По хорошему, все логи надо обрабатывать через инфраструктуру, для этого предназначенную — ELK, Zabbix и т.д.

Каким образом я использую такой вариант:

  1. При разработке потока создаю свой уровень логирования, и отлавливаю определенные события через LogEvent / LogAttribute.

  2. При долговременом тестировании потоков (test / stage) отлавливаю ошибки.

  3. На проде ловлю ошибки загрузки, чтобы успеть быстро загрузить вручную данные и успеть до старта загрузки DWH.

Полезные ссылки:

  1. Документация Apache Nifi.

  2. Полезный YouTube канал, где я и увидел принцип построения такого потока.

  3. Сообщество в телеграмм

Всем добра.


ссылка на оригинал статьи https://habr.com/ru/post/669174/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *