VRL — просто, подробно и понятно

от автора

Vector является очень гибким агрегатором сообщений и поддерживает собственный язык для обработки событий — VRL.

Поэтому предлагаю без лишних слов перейти к изучению его возможностей. Давайте напишем простейшую конфигурацию с использованием VRL, которая будет удалять поле из JSON файла

На входе будут такие данные:

{   "field1": "foo",   "field2": "bar" } 

Наша первая конфигурация на VRL будет выглядеть так:

sources:   file_input:     type: file     include:       - /opt/habr/vector/intro.json     ignore_checkpoints: true  transforms:   parse_file:     type: remap     inputs:       - file_input     source: |          .message = parse_json!(.message)         .field1 = .message.field1          del(.host)         del(.file)         del(.message)         del(.timestamp)         del(.source_type)  sinks:   test_output:     type: file     inputs:       - parse_file     path: /opt/habr/vector/intro_out.json     encoding:       codec: json 

И на выходе мы получим такую данные:

{   "field1": "foo" } 

Зачем в нашем трансформе такая часть с удалением других полей?

        del(.host)         del(.file)         del(.message)         del(.timestamp)         del(.source_type) 

Дело в том, что на источник данных в нашем примере — file, который содержит помимо самой строки, ещё и другие метаданные в событии.
Вот как бы выглядел наш вывод, если оставить эти поля:

{   "field1": "foo",   "file": "/opt/habr/vector/intro.json",   "host": "test-mon",   "message": {     "field1": "foo",     "field2": "bar"   },   "source_type": "file",   "timestamp": "2025-03-17T13:09:20.653470883Z" } 

Здесь стоит обратить внимание, что наше поле field1 встречается дважды. Правда в первом случае — это отдельный объект, поскольку мы объявили его внутри нашего трансформа, а во втором случае — это часть message, то есть содержимое нашей строки.

Полезной практикой при написании VRL будет создание отдельного файла под VRL. В нашем примере VRL получился совсем короткий, но бывают случаи, когда один VRL растягивается на 100, 200 и 300 строк. Поэтому хранить всё в одном файле будет неудобно, есть риск допустить ошибки в конфигурации, а также теряется читабельность.

Чтобы вынести VRL в отдельный файл, достаточно указать параметр «file» в transform

sources:   file_input:     type: file     include:       - /opt/habr/vector/intro.json     ignore_checkpoints: true  transforms:   parse_file:     type: remap     inputs:       - file_input     file: /etc/vector/habr/intro.vrl  sinks:   test_output:     type: file     inputs:       - parse_file     path: /opt/habr/vector/intro_out.json     encoding:       codec: json 

Теперь наша конфигурация выглядит компактнее, а с самим VRL можно работать в отдельном файле.

2. Синтаксис VRL

Теперь изучив базовую работу с VRL, можно изучить его синтаксис, чтобы начать работать с Vector более гибко.

Переменные

Переменные в VRL могут содержать числовые, строковые и логические значения. Их можно присвоить напрямую:

$ a=5                                                                 5  $ b = "some text" "some text"  $ pi = 3.14 3.14  $ k = true true 

Либо же можно присвоить значение как результат выполнения функции:

$ c = snakecase(b) "some_text" 

Также с числовыми переменными можно выполнять базовые математические операции:

$ a = 5 + 5 10  $ a = 5 - 2 3  $ a = 5 * 5 25  $ a = 5 / 5 1  $ a = 7 / 5 1.4  $ a = mod(5, 3) 2 

А также мы можем объединять вместе строковые переменные:

$ str1 = "some text" "some text"  $ str2 = "other text" "other text"  $ str1 + " and " + str2 "some text and other text" 

Также стоит отметить, что переменные, начинающиеся с точки — это JSON поля, которые пойдут дальше в sink, а без неё — это обычные переменные, которые существуют в рамках трансформа.

Массивы

VRL поддерживает работу с массивами. Массивы могут хранить в себе любые типы данных, такие как переменные, числа, строки или JSON объекты:

$ a = 5 5  $ arr = ["text", 5, 3.14, true, a, {"foo": "bar"}] ["text", 5, 3.14, true, 5, { "foo": "bar" }] 

Операторы и циклы

Полноценно VRL поддерживает только оператор if, как таковых других циклов по типу for или while здесь нет, привычных нам по большинству языков программирования, но в случае с for есть функция, которая выполняет схожий механизм, о котором поговорим чуть позже. А вот при попытке использовать цикл while, мы увидим лишь, что это ключевое слово зарезервировано для будущих нужд:

$ while  error[E205]: reserved keyword   ┌─ :1:1   │ 1 │ while   │ ^^^^^   │ │   │ this identifier name is reserved for future use in the language   │ use a different name instead   │   = see language documentation at https://vrl.dev   = try your code in the VRL REPL, learn more at https://vrl.dev/examples 

В целом, полезно ознакомиться со списком зарезервированных слов, чтобы избежать конфликтов — https://vector.dev/docs/reference/vrl/expressions/#keywords

Итак, начнём с простого оператора проверки условия if — else.
Здесь мы имеем вполне знакомый синтаксис:

if *условие* { *дейстиве* } else if *условие* { *дейтсвие* } else { *действие* } 

И попробуем это выполнить на реальном примере:

$ a = 3 if (a == 10) {   "first block" } else if (a <= 10) {   "second block" } else {   "third block" } "second block" 

Теперь перейдём к перебору массивов. Как я уже писал, на данный момент VRL не поддерживает цикл for и есть лишь функция, которая выполняет схожий механизм.

Рассмотрим функцию for_each. Для примера возьмём такой массив:

.arr = ["my", "first", "array", "loop"] 

И теперь попробуем применить функцию upcase для каждого элемента массива с помощью for_each:

$ .new_arr = [] $ $ for_each(.arr) -> |_index, value|{     .new_arr = push(.new_arr, upcase(value)) } $ $ .new_arr ["MY", "FIRST", "ARRAY", "LOOP"] $ .arr = del(.new_arr) ["MY", "FIRST", "ARRAY", "LOOP"] 

Как видим, здесь всё не так просто. Для начала нам надо создать новый массив, который будет хранить обработанные значения. Затем мы пробегаемся по массиву .arr, используя for_each. Выражение |_index, value|, означает, что мы будет работать только со значениями массива, и индексы нас не интересуют, поэтому будет обработан каждый элемент массива. Далее мы добавляем новое обработанное значение в массив .new_arr с помощью функции push. После чего мы проверяем содержимое временного массива, присваиваем его нашему исходному массиву и удаляем временный.

У вас мог возникнуть вопрос, почему мы не можем просто использовать исходный массив, и применить функцию upscale к каждому элементу? Но, к сожалению, VRL не позволяет нам работать с массивом таким образом:

$ for_each(.arr) -> |index, value| {     .arr[index] = upcase(value) }  error[E203]: syntax error   ┌─ :2:10   │ 2 │     .arr[index] = upcase(value)   │          ^^^^^   │          │   │          unexpected syntax token: "Identifier"   │          expected one of: "integer literal"   │   = see language documentation at https://vrl.dev   = try your code in the VRL REPL, learn more at https://vrl.dev/examples 

Но на самом деле мы можем обработать этот массив иначе, не прибегая к созданию временного массива. Для этого мы можем воспользоваться функцией map_value, которая позволяет обрабатывать значения:

$ map_values(.arr) -> |value| { upcase!(value) } ["MY", "FIRST", "ARRAY", "LOOP"] 

Теперь давайте рассмотрим аналогичное действие, но на это раз обработаем только чётные элементы массива .arr:

$ .new_arr = [] $ $ for_each(.arr) -> |index, value| {     if mod(index, 2) == 0 {         .new_arr = push(.new_arr, upcase(value))     } else{         .new_arr = push(.new_arr, value)     } } $ $ .new_arr ["MY", "first", "ARRAY", "loop"] $ .arr = del(.new_arr) ["MY", "first", "ARRAY", "loop"] 

Здесь мы вновь создаём наш временный массив, который будет хранить обработанные значения. На это раз мы указываем index без нижнего подчёркивания и в теле for_each проверяем значение индекса. Если его остаток от деления на 2 равен 0, тогда этот элемент массива .arr чётный и мы применяем к нему функцию upscale. В противном случаем элемент массива .arr будет добавлен массиву .new_arr без изменений.

Обратите внимание, на блок else. Его обязательно надо добавить в наш цикл, поскольку без него нечётные элементы просто будут отброшены:

$ .new_arr ["MY", ARRAY"] 

Обработка ошибок

Немало важной частью является обработка ошибок внутри вашего трансформа. При написании пайплайна могут возникать неоднозначные ситуации, которые требуют отдельного внимания.

Представим ситуацию, у нас есть файл с таким содержимым:

1,,3 

И мы хотим разбить каждое число в отдельное поле и умножить его на 5, разделив их запятой. Для этого напишем такой VRL:

arr = split(.message, ",")  .var1 = to_int(arr[0]) * 5 .var2 = to_int(arr[1]) * 5 .var3 = to_int(arr[2]) * 5 

А сам файл конфигурации будет выглядеть так:

sources:   file_input:     type: file     include:       - /opt/habr/vector/error.txt     ignore_checkpoints: true  transforms:   parse_file:     type: remap     inputs:       - file_input     file: /etc/vector/habr/errors.vrl  sinks:   test_output:     type: file     inputs:       - parse_file     path: /opt/habr/vector/error.json     encoding:       codec: json 

Теперь попробуем запустить vector с такой конфигурацией:

root@test-mon:/etc/vector/habr# vector --config errors.yaml  2025-07-16T14:30:36.222710Z  INFO vector::app: Log level is enabled. level="info" 2025-07-16T14:30:36.223878Z  INFO vector::app: Loading configs. paths=["errors.yaml"] 2025-07-16T14:30:36.226896Z ERROR vector::topology::builder: Configuration error. error=Transform "parse_file":  error[E103]: unhandled fallible assignment   ┌─ :1:7   │ 1 │ arr = split(.message, ",")   │ ----- ^^^^^^^^^^^^^^^^^^^^   │ │     │   │ │     this expression is fallible because at least one argument's type cannot be verified to be valid   │ │     update the expression to be infallible by adding a `!`: `split!(.message, ",")`   │ │     `.message` argument type is `any` and this function expected a parameter `value` of type `string` 

Во-первых, мы видим, что vector сразу ругается на функцию split, поскольку такая запись может привести к ошибкам. В нашем случае мы полностью уверены в нашем выборе, поэтому обозначим функцию split с символом «!», чтобы вектор проигнорировал потенциально опасную запись и продолжил работу:

arr = split!(.message, ",") 

И попробуем запустить vector:

root@test-mon:/etc/vector/habr# vector --config errors.yaml  2025-07-16T14:35:09.037986Z  INFO vector::app: Log level is enabled. level="info" 2025-07-16T14:35:09.039181Z  INFO vector::app: Loading configs. paths=["errors.yaml"] 2025-07-16T14:35:09.042971Z ERROR vector::topology::builder: Configuration error. error=Transform "parse_file":  error[E103]: unhandled fallible assignment   ┌─ :3:9   │ 3 │ .var1 = to_int(arr[0]) * 5   │ ------- ^^^^^^^^^^^^^^^^^^ this expression is fallible because at least one argument's type cannot be verified to be valid 

И вновь непорядок, вновь vector падает из-за ошибки. На это раз ему не нравится функция to_int.
Представим, что мы очень уверенные в себе люди, и так же добавим «!» к функции

.var1 = to_int!(arr[0]) * 5 .var2 = to_int!(arr[1]) * 5 .var3 = to_int!(arr[2]) * 5 

Наверное теперь всё должно пройти хорошо и vector не свалится в ошибку:

root@test-mon:/etc/vector/habr# vector --config errors.yaml  2025-07-16T14:36:22.005848Z  INFO vector::app: Log level is enabled. level="info" 2025-07-16T14:36:22.006867Z  INFO vector::app: Loading configs. paths=["errors.yaml"] 2025-07-16T14:36:22.010056Z  INFO vector::topology::running: Running healthchecks. 2025-07-16T14:36:22.010141Z  INFO vector::topology::builder: Healthcheck passed. 2025-07-16T14:36:22.010142Z  INFO vector: Vector has started. debug="false" version="0.45.0" arch="x86_64" revision="063cabb 2025-02-24 14:52:02.810034614" 2025-07-16T14:36:22.010167Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`. 2025-07-16T14:36:22.010197Z  INFO source{component_kind="source" component_id=file_input component_type=file}: vector::sources::file: Starting file server. include=["/opt/habr/vector/error.txt"] exclude=[] 2025-07-16T14:36:22.010491Z  INFO source{component_kind="source" component_id=file_input component_type=file}:file_server: file_source::checkpointer: Loaded checkpoint data. 2025-07-16T14:36:22.010623Z  INFO source{component_kind="source" component_id=file_input component_type=file}:file_server: vector::internal_events::file::source: Found new file to watch. file=/opt/habr/vector/error.txt 2025-07-16T14:36:22.010881Z ERROR transform{component_kind="transform" component_id=parse_file component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"to_int\" at (65:80): Invalid integer \"\": cannot parse integer from empty string" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true 

Вроде бы всё хорошо, в этот раз vector не упал. Но в логе можно увидеть одно сообщение:

2025-07-16T14:36:22.010881Z ERROR transform{component_kind="transform" component_id=parse_file component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"to_int\" at (65:80): Invalid integer \"\": cannot parse integer from empty string" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true 

В этой записи, мы видим, что на вход функции to_int поступило пустое число. Что же тогда vector записал в файл вывода?

{"file":"/opt/habr/vector/error.txt","host":"test-mon","message":"1,,3","source_type":"file","timestamp":"2025-07-16T14:36:22.010766422Z"} 

А в выходном файле у нас записано событие целиком, поскольку оно обработалось с ошибкой.

Этот пример отчётливо показывает, что не стоит злоупотреблять знаками «!», а нужно их использовать только в том случае, если вы уверены, что ваша форма записи отработает корректно.

Но как же нам тогда поступить в этой ситуации? Для начала стоит отметить, зачем мы используем функцию to_int. Дело в том, что после обработки функцией split, мы получаем на выходе все элементы массива arr в string. Поэтому мы не можем просто умножить на 5, нам нужно привести данные к int. И в нашем исходном файле есть пропущенное число, поэтому такой случай надо отдельно обозначить:

.var1, err = to_int(arr[0]) * 5 .var2, err = to_int(arr[1]) * 5 .var3, err = to_int(arr[2]) * 5 

Посмотрим, как теперь выглядят строки в выходном файле:

{"file":"/opt/habr/vector/error.txt","host":"test-mon","message":"1,,3","source_type":"file","timestamp":"2025-07-16T17:29:05.314777030Z","var1":5,"var2":0,"var3":15} 

Видно, что теперь у нас обработались наши поля, а поле с пустым значением приравнялось у нулю. Всё потому что arr[1] у нас null и при преобразовании через `to_int, мы получаем 0

Применяем VRL на практике

Теперь давайте попробуем закрепить все полученные знания на реальном примере.
Представим такой кейс, что у нас есть банковское приложение для перевода средств между клиентами. И это приложение генерирует данные по каждой транзакции.

Возьмём за основу такие данные с такой структурой полей:

"account_receiver": "234567891012", "account_sender": "750", "auth_channel_end": "PS", "auth_channel_start": "PS", "auth_context_end": "PS,43604000B000DA02,2025-02-14T10:10:28+03:00", "auth_context_start": "PS,43604000B000DA02,2025-02-14T10:10:28+03:00", "authorization_time": "2025-01-21 08:10:35", "branch_code": "123456789101", "channel_type": "3", "compliance_rules": "345678912101,91,[],[],,AC", "counterparty_id_a": "", "counterparty_id_b": "urn:86661307-056805-0", "customer_external_id": "436046005035947", "customer_id": "234567891012", "customer_internal_id": "", "device_type": "1", "entry_time_end": "2025-02-14 10:10:28", "entry_time_start": "2025-02-14 10:10:28", "entrypoint_node_end": "43604000B000DA02", "entrypoint_node_start": "43604000B000DA02", "forwarding": "", "forwarding_reason": "", "operation_code": "234567891012", "processing_code": "900", "processing_time_sec": "55", "security_flag": "", "session_token_a": "jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4", "session_token_b": "678F728F12B9D00030BA8@2.3.4.5:1234", "session_token_c": "", "status_http": "", "transaction_end_time": "2025-02-14 10:11:17", "transaction_id": "51ab65e0-0805-498f-8508-3cc4cd459ffd", "transaction_start_time": "2025-01-21 08:10:22", "transaction_status_code": "16", "transaction_type": "522", "used_services_raw": "OIP,override,0,,,2025-02-14T10:10:23+03:00,,,,[];CAT,,0,,,2025-02-14T10:10:31+03:00,,,,[]", "user_interaction_time_sec": "43" 

И сами строки в сыром виде, будут выглядеть так:

2025-02-14T10:11:17+03:00;522;123456789101;51ab65e0-0805-498f-8508-3cc4cd459ffd;234567891012;2025-01-21T10:10:22+05:00;2025-01-21T10:10:35+05:00;234567891012;750;234567  891012;;;3;1;jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4;678F728F12B9D00030BA8@2.3.4.5:1234;;;436046005035947;;urn:86661307-056805-0;PS,43604000B000DA02,2025-02-14T10:10:2  8+03:00;PS,43604000B000DA02,2025-02-14T10:10:28+03:00;"";900;"345678912101,91,[],[],,AC";"OIP,override,0,,,2025-02-14T10:10:23+03:00,,,,[];CAT,,0,,,2025-02-14T10:10:31+  03:00,,,,[]";16;;55;43; 

Теперь представим себе такую задачу: наша банковская система генерирует данные в таком формате, а партнёру надо преобразовать данные в удобный для него формат. Нам необходимо будет поменять формат меток времени из RFC 3339 в простой DateTime формат, а также нам надо заменить разделители в compliance_rules и used_services_raw с «;» на «&»

Для этого набросаем такой VRL. Для начала нам надо разбить строку на части, поскольку количество элементов не фиксировано, и просто разбить через «;» не получится, то мы поделим сообщение на части с помощью регулярного выражения по ключевым моментам в каждом сообщении:

.message = string!(.message) .message = parse_regex!(.message, r'^(?P<log_part1>.*?);"(?P<log_security_flag>)";(?P<log_processing_code>[^;]*);"(?P<log_compliance_rules>[^"]*)";"(?P<log_used_service s_raw>[^"]*)";(?P<log_part2>.*)$') .part1 = .message.log_part1 .security_flag = .message.log_security_flag .processing_code = .message.log_processing_code .compliance_rules = .message.log_compliance_rules .used_services_raw = .message.log_used_services_raw .part2 = .message.log_part2 

Здесь мы разбиваем сообщение на несколько частей, чтобы в дальнейшем их обработать отдельно. Также мы сразу создаём уже вычлененные поля в отдельные поля.

Теперь перейдём к обработке первой части сообщения:

.part1 = string(.part1) arr = split(.part1, ";") .transaction_end_time, err = if arr[0] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" } .transaction_type = to_string(arr[1]) .branch_code = to_string(arr[2]) .transaction_id = to_string(arr[3]) .customer_id = to_string(arr[4]) .transaction_start_time, err = if arr[5] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[5], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" } .authorization_time, err = if arr[6] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[6], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" } .operation_code = to_string(arr[7]) .account_sender = to_string(arr[8]) .account_receiver = to_string(arr[9]) .forwarding = to_string(arr[10]) .forwarding_reason = to_string(arr[11]) .channel_type = to_string(arr[12]) .device_type = to_string(arr[13]) .session_token_a = to_string(arr[14]) .session_token_b = to_string(arr[15]) .session_token_c = to_string(arr[16]) .customer_internal_id = to_string(arr[17]) .customer_external_id = to_string(arr[18]) .counterparty_id_a = to_string(arr[19]) .counterparty_id_b = to_string(arr[20]) .auth_context_start = to_string(arr[21]) .auth_context_end = to_string(arr[22]) 

Разберём это детальнее. Для начала мы разбиваем первую часть с помощью «;» и записываем это в массив arr:

.part1 = string(.part1) arr = split(.part1, ";") 

После чего мы получаем мы преобразуем наш первый timestamp transaction_end_time в нужный формат.

.transaction_end_time, err = if arr[0] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" } 

Для начала мы проверяем, что первый элемент массива arr не пустой, в противном случае он останется пустым, после чего мы преобразуем нашу метку времени в обычный unix timestamp, преобразуем его в int и прибавляем к нему 10800 (3 часа). Это необходимо для того, чтобы не сбить наше время от таймзоны, поскольку на выходе у нас получится unix timestamp в GMT +0. И после этого мы преобразуем наш unix timestamp в нужный формат времени. Далее мы будем применять аналогичный способ для всех меток времени.
А также создаём отдельные поля:

.transaction_type = to_string(arr[1]) ... .auth_context_end = to_string(arr[22]) 

Аналогичным образом мы поступим и для auth_context_start и auth_context_end

.auth_context_start = string(.auth_context_start) arr1 = split(.auth_context_start, ",")  .auth_channel_start = to_string(arr1[0]) .entrypoint_node_start = to_string(arr1[1]) .entry_time_start, err = if arr1[2] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr1[2], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" }   .auth_context_end = string(.auth_context_end) arr2 = split(.auth_context_end, ",")  .auth_channel_end = to_string(arr2[0]) .entrypoint_node_end = to_string(arr2[1]) .entry_time_end, err = if arr2[2] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr2[2], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {      "" } 

Дальше мы поступим очень интересно. Внутри compliance_rules может идти список из нескольких правил, но в наем случае всегда одно. Мы будем менять разделитель «;» между правилами на «&»:

.compliance_rules = replace(.compliance_rules, ";", "&") 

А также нам надо заменить запятые внутри квадратный скобок. Для этого мы воспользуемся replace_with, чтобы найти внутри compliance_rules блок с фигурными скобками и заменить внутри запятые:

.compliance_rules, err = replace_with(.compliance_rules, r'\[(.*?)\]') -> |m| {     "[" + replace(m.captures[0], ",", "&") + "]" } 

Здесь передаём функции compliance_rules и задаём регулярное выражение для поиска блока с квадратными скобками. Полученное совпадение записывается в переменную m и уже внутри неё меняются запятые на «&».

Далее нам надо преобразовать метку внутри used_services_raw:

.used_services_raw = string(.used_services_raw) .services_info = split(.used_services_raw, ";") .parsed_services_info = []  for_each(.services_info) -> |_index, value| {     next_service = split(value, ",")      service_call_time, err = if next_service[5] != "" {         unix_timestamp = to_unix_timestamp(parse_timestamp!(next_service[5], "%+"))         unix_timestamp = to_int(unix_timestamp) + 10800         format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")     } else {         ""     }      obj = {         "service_name": to_string(next_service[0]),         "service_mode": to_string(next_service[1]),         "number_of_diversions": to_string(next_service[2]),         "assoc_party_id": to_string(next_service[3]),         "service_id": to_string(next_service[4]),         "service_call_time": service_call_time,         "number_of_participants": to_string(next_service[6]),         "action_type": to_string(next_service[7]),         "customer_group_tag": to_string(next_service[8]),         "service_cost_info": to_string(next_service[9])     }      .parsed_services_info = push(.parsed_services_info, obj) } 

Внутри used_services_raw у нас может идти сколько угодно сервисов, поэтому мы разбиваем его с помощью «;» и записываем в массив service_info.
Далее мы парсим каждый сервис из массива, пробежавшись по нему функцией for_each. Внутри мы по знакомой схеме меняем timestamp, записываем это в словарь obj и записываем его в отдельное поле.

Затем мы идёт простая часть с вычленением полей из второй части сообщения:

.part2 = string(.part2) arr3 = split(.part2, ";") .transaction_status_code = to_string(arr3[0]) .status_http = to_string(arr3[1]) .processing_time_sec = to_string(arr3[2]) .user_interaction_time_sec = to_string(arr3[3]) 

И теперь нам остаётся собрать воедино все разбитые поля в .message, чтобы записать обработанную строку в выходной файл.

.message =      .transaction_end_time + ";" +         .transaction_type + ";" +         .branch_code + ";" +         .transaction_id + ";" +         .customer_id + ";" +         .transaction_start_time + ";" +         .authorization_time + ";" +         .operation_code + ";" +         .account_sender + ";" +         .account_receiver + ";" +         .forwarding + ";" +         .forwarding_reason + ";" +         .channel_type + ";" +         .device_type + ";" +         .session_token_a + ";" +         .session_token_b + ";" +         .session_token_c + ";" +         .customer_internal_id + ";" +         .customer_external_id + ";" +         .counterparty_id_a + ";" +         .counterparty_id_b + ";" +         .auth_channel_start + "," +         .entrypoint_node_start + "," +         .entry_time_start + ";" +         .auth_channel_end + "," +         .entrypoint_node_end + "," +         .entry_time_end + ";\"" +          .security_flag + "\";" +         .processing_code + ";\"" +         .compliance_rules + "\";\""  for_each(.parsed_services_info) -> |index, obj| {     if index != 0 {         .message = .message + "&"     }     .message = .message +                 obj.service_name + "," +                obj.service_mode + "," +                obj.number_of_diversions + "," +                obj.assoc_party_id + "," +                obj.service_id + "," +                obj.service_call_time + "," +                obj.number_of_participants + "," +                obj.action_type + "," +                obj.customer_group_tag + "," +                obj.service_cost_info         }   .message = .message + "\";" +                 .transaction_status_code + ";" +                 .status_http + ";" +                 .processing_time_sec + ";" +                 .user_interaction_time_sec + ";" 

Здесь мы поочерёдно прописываем в нужном порядке наши поля, добавляя к ним разделители.
Отдельно можно выделить секцию с parsed_services_info. Нам также надо заменить разделитель между сервисами на «&». Поэтому мы поочерёдно записываем содержимое словарей в message, а также добавляем перед ним «&». Но здесь, стоит заметить, что у нас добавлена проверка для первого элемента, чтобы перед ним не ставился разделитель, поскольку перед ним нет другого сервиса, иначе первый сервис тоже бы начинался с разделителя, что неверно.

Конечный VRL будет выглядеть так:

.message = string!(.message) .message = parse_regex!(.message, r'^(?P<log_part1>.*?);"(?P<log_security_flag>)";(?P<log_processing_code>[^;]*);"(?P<log_compliance_rules>[^"]*)";"(?P<log_used_services_raw>[^"]*)";(?P<log_part2>.*)$') .part1 = .message.log_part1 .security_flag = .message.log_security_flag .processing_code = .message.log_processing_code .compliance_rules = .message.log_compliance_rules .used_services_raw = .message.log_used_services_raw .part2 = .message.log_part2  .part1 = string(.part1) arr = split(.part1, ";") .transaction_end_time, err = if arr[0] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" } .transaction_type = to_string(arr[1]) .branch_code = to_string(arr[2]) .transaction_id = to_string(arr[3]) .customer_id = to_string(arr[4]) .transaction_start_time, err = if arr[5] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[5], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" } .authorization_time, err = if arr[6] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[6], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" } .operation_code = to_string(arr[7]) .account_sender = to_string(arr[8]) .account_receiver = to_string(arr[9]) .forwarding = to_string(arr[10]) .forwarding_reason = to_string(arr[11]) .channel_type = to_string(arr[12]) .device_type = to_string(arr[13]) .session_token_a = to_string(arr[14]) .session_token_b = to_string(arr[15]) .session_token_c = to_string(arr[16]) .customer_internal_id = to_string(arr[17]) .customer_external_id = to_string(arr[18]) .counterparty_id_a = to_string(arr[19]) .counterparty_id_b = to_string(arr[20]) .auth_context_start = to_string(arr[21]) .auth_context_end = to_string(arr[22])   .auth_context_start = string(.auth_context_start) arr1 = split(.auth_context_start, ",")  .auth_channel_start = to_string(arr1[0]) .entrypoint_node_start = to_string(arr1[1]) .entry_time_start, err = if arr1[2] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr1[2], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {     "" }   .auth_context_end = string(.auth_context_end) arr2 = split(.auth_context_end, ",")  .auth_channel_end = to_string(arr2[0]) .entrypoint_node_end = to_string(arr2[1]) .entry_time_end, err = if arr2[2] != "" {     unix_timestamp = to_unix_timestamp(parse_timestamp!(arr2[2], "%+"))     unix_timestamp = to_int(unix_timestamp) + 10800     format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else {      "" }   .compliance_rules = replace(.compliance_rules, ";", "&")  .compliance_rules, err = replace_with(.compliance_rules, r'\[(.*?)\]') -> |m| {     "[" + replace(m.captures[0], ",", "&") + "]" }  .used_services_raw = string(.used_services_raw) .services_info = split(.used_services_raw, ";") .parsed_services_info = []  for_each(.services_info) -> |_index, value| {     next_service = split(value, ",")              service_call_time, err = if next_service[5] != "" {         unix_timestamp = to_unix_timestamp(parse_timestamp!(next_service[5], "%+"))         unix_timestamp = to_int(unix_timestamp) + 10800         format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S")     } else {         ""     }                  obj = {         "service_name": to_string(next_service[0]),         "service_mode": to_string(next_service[1]),         "number_of_diversions": to_string(next_service[2]),         "assoc_party_id": to_string(next_service[3]),         "service_id": to_string(next_service[4]),         "service_call_time": service_call_time,         "number_of_participants": to_string(next_service[6]),         "action_type": to_string(next_service[7]),         "customer_group_tag": to_string(next_service[8]),         "service_cost_info": to_string(next_service[9])     }                  .parsed_services_info = push(.parsed_services_info, obj) }   .part2 = string(.part2) arr3 = split(.part2, ";") .transaction_status_code = to_string(arr3[0]) .status_http = to_string(arr3[1]) .processing_time_sec = to_string(arr3[2]) .user_interaction_time_sec = to_string(arr3[3])   .message =      .transaction_end_time + ";" +         .transaction_type + ";" +         .branch_code + ";" +         .transaction_id + ";" +         .customer_id + ";" +         .transaction_start_time + ";" +         .authorization_time + ";" +         .operation_code + ";" +         .account_sender + ";" +         .account_receiver + ";" +         .forwarding + ";" +         .forwarding_reason + ";" +         .channel_type + ";" +         .device_type + ";" +         .session_token_a + ";" +         .session_token_b + ";" +         .session_token_c + ";" +         .customer_internal_id + ";" +         .customer_external_id + ";" +         .counterparty_id_a + ";" +         .counterparty_id_b + ";" +         .auth_channel_start + "," +         .entrypoint_node_start + "," +         .entry_time_start + ";" +         .auth_channel_end + "," +         .entrypoint_node_end + "," +         .entry_time_end + ";\"" +          .security_flag + "\";" +         .processing_code + ";\"" +         .compliance_rules + "\";\""  for_each(.parsed_services_info) -> |index, obj| {     if index != 0 {         .message = .message + "&"     }     .message = .message +                 obj.service_name + "," +                obj.service_mode + "," +                obj.number_of_diversions + "," +                obj.assoc_party_id + "," +                obj.service_id + "," +                obj.service_call_time + "," +                obj.number_of_participants + "," +                obj.action_type + "," +                obj.customer_group_tag + "," +                obj.service_cost_info         }   .message = .message + "\";" +                 .transaction_status_code + ";" +                 .status_http + ";" +                 .processing_time_sec + ";" +                 .user_interaction_time_sec + ";"   del(.host) del(.file) del(.timestamp) del(.source_type) 

И на выходе мы получаем такую обработанную строку:

2025-02-14 10:11:17;522;123456789101;51ab65e0-0805-498f-8508-3cc4cd459ffd;234567891012;2025-01-21 08:10:22;2025-01-21 08:10:35;234567891012;750;234567891012;;;3;1;jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4;678F728F12B9D00030BA8@2.3.4.5:1234;;;436046005035947;;urn:86661307-056805-0;PS,43604000B000DA02,2025-02-14 10:10:28;PS,43604000B000DA02,2025-02-14 10:10:28;"";900;"345678912101,91,[],[],,AC";"OIP,override,0,,,2025-02-14 10:10:23,,,,[]&CAT,,0,,,2025-02-14 10:10:31,,,,[]";16;;55;43; 

Заключение

Надеюсь, мне удалось вам донести принцип работы с VRL. Мы рассмотрели синтаксис и базовые функции для работы с данными. А также попробовали применить полученные знания на практике. Не стоит сильно заострят внимание на требованиях к обработке данных из примера, поскольку главной задачей было показать базовый подход к обработке данных, а также зацепить нестандартные методы.
Для будет приятна, если эта статья может вам освоить такой замечательный инструмент как vector и поможем вам составлять пайплайны для обработки своих данных.


ссылка на оригинал статьи https://habr.com/ru/articles/933762/