Vector является очень гибким агрегатором сообщений и поддерживает собственный язык для обработки событий — VRL.
Поэтому предлагаю без лишних слов перейти к изучению его возможностей. Давайте напишем простейшую конфигурацию с использованием VRL, которая будет удалять поле из JSON файла
На входе будут такие данные:
{ "field1": "foo", "field2": "bar" }
Наша первая конфигурация на VRL будет выглядеть так:
sources: file_input: type: file include: - /opt/habr/vector/intro.json ignore_checkpoints: true transforms: parse_file: type: remap inputs: - file_input source: | .message = parse_json!(.message) .field1 = .message.field1 del(.host) del(.file) del(.message) del(.timestamp) del(.source_type) sinks: test_output: type: file inputs: - parse_file path: /opt/habr/vector/intro_out.json encoding: codec: json
И на выходе мы получим такую данные:
{ "field1": "foo" }
Зачем в нашем трансформе такая часть с удалением других полей?
del(.host) del(.file) del(.message) del(.timestamp) del(.source_type)
Дело в том, что на источник данных в нашем примере — file, который содержит помимо самой строки, ещё и другие метаданные в событии.
Вот как бы выглядел наш вывод, если оставить эти поля:
{ "field1": "foo", "file": "/opt/habr/vector/intro.json", "host": "test-mon", "message": { "field1": "foo", "field2": "bar" }, "source_type": "file", "timestamp": "2025-03-17T13:09:20.653470883Z" }
Здесь стоит обратить внимание, что наше поле field1 встречается дважды. Правда в первом случае — это отдельный объект, поскольку мы объявили его внутри нашего трансформа, а во втором случае — это часть message, то есть содержимое нашей строки.
Полезной практикой при написании VRL будет создание отдельного файла под VRL. В нашем примере VRL получился совсем короткий, но бывают случаи, когда один VRL растягивается на 100, 200 и 300 строк. Поэтому хранить всё в одном файле будет неудобно, есть риск допустить ошибки в конфигурации, а также теряется читабельность.
Чтобы вынести VRL в отдельный файл, достаточно указать параметр «file» в transform
sources: file_input: type: file include: - /opt/habr/vector/intro.json ignore_checkpoints: true transforms: parse_file: type: remap inputs: - file_input file: /etc/vector/habr/intro.vrl sinks: test_output: type: file inputs: - parse_file path: /opt/habr/vector/intro_out.json encoding: codec: json
Теперь наша конфигурация выглядит компактнее, а с самим VRL можно работать в отдельном файле.
2. Синтаксис VRL
Теперь изучив базовую работу с VRL, можно изучить его синтаксис, чтобы начать работать с Vector более гибко.
Переменные
Переменные в VRL могут содержать числовые, строковые и логические значения. Их можно присвоить напрямую:
$ a=5 5 $ b = "some text" "some text" $ pi = 3.14 3.14 $ k = true true
Либо же можно присвоить значение как результат выполнения функции:
$ c = snakecase(b) "some_text"
Также с числовыми переменными можно выполнять базовые математические операции:
$ a = 5 + 5 10 $ a = 5 - 2 3 $ a = 5 * 5 25 $ a = 5 / 5 1 $ a = 7 / 5 1.4 $ a = mod(5, 3) 2
А также мы можем объединять вместе строковые переменные:
$ str1 = "some text" "some text" $ str2 = "other text" "other text" $ str1 + " and " + str2 "some text and other text"
Также стоит отметить, что переменные, начинающиеся с точки — это JSON поля, которые пойдут дальше в sink, а без неё — это обычные переменные, которые существуют в рамках трансформа.
Массивы
VRL поддерживает работу с массивами. Массивы могут хранить в себе любые типы данных, такие как переменные, числа, строки или JSON объекты:
$ a = 5 5 $ arr = ["text", 5, 3.14, true, a, {"foo": "bar"}] ["text", 5, 3.14, true, 5, { "foo": "bar" }]
Операторы и циклы
Полноценно VRL поддерживает только оператор if, как таковых других циклов по типу for или while здесь нет, привычных нам по большинству языков программирования, но в случае с for есть функция, которая выполняет схожий механизм, о котором поговорим чуть позже. А вот при попытке использовать цикл while, мы увидим лишь, что это ключевое слово зарезервировано для будущих нужд:
$ while error[E205]: reserved keyword ┌─ :1:1 │ 1 │ while │ ^^^^^ │ │ │ this identifier name is reserved for future use in the language │ use a different name instead │ = see language documentation at https://vrl.dev = try your code in the VRL REPL, learn more at https://vrl.dev/examples
В целом, полезно ознакомиться со списком зарезервированных слов, чтобы избежать конфликтов — https://vector.dev/docs/reference/vrl/expressions/#keywords
Итак, начнём с простого оператора проверки условия if — else.
Здесь мы имеем вполне знакомый синтаксис:
if *условие* { *дейстиве* } else if *условие* { *дейтсвие* } else { *действие* }
И попробуем это выполнить на реальном примере:
$ a = 3 if (a == 10) { "first block" } else if (a <= 10) { "second block" } else { "third block" } "second block"
Теперь перейдём к перебору массивов. Как я уже писал, на данный момент VRL не поддерживает цикл for и есть лишь функция, которая выполняет схожий механизм.
Рассмотрим функцию for_each. Для примера возьмём такой массив:
.arr = ["my", "first", "array", "loop"]
И теперь попробуем применить функцию upcase для каждого элемента массива с помощью for_each:
$ .new_arr = [] $ $ for_each(.arr) -> |_index, value|{ .new_arr = push(.new_arr, upcase(value)) } $ $ .new_arr ["MY", "FIRST", "ARRAY", "LOOP"] $ .arr = del(.new_arr) ["MY", "FIRST", "ARRAY", "LOOP"]
Как видим, здесь всё не так просто. Для начала нам надо создать новый массив, который будет хранить обработанные значения. Затем мы пробегаемся по массиву .arr, используя for_each. Выражение |_index, value|, означает, что мы будет работать только со значениями массива, и индексы нас не интересуют, поэтому будет обработан каждый элемент массива. Далее мы добавляем новое обработанное значение в массив .new_arr с помощью функции push. После чего мы проверяем содержимое временного массива, присваиваем его нашему исходному массиву и удаляем временный.
У вас мог возникнуть вопрос, почему мы не можем просто использовать исходный массив, и применить функцию upscale к каждому элементу? Но, к сожалению, VRL не позволяет нам работать с массивом таким образом:
$ for_each(.arr) -> |index, value| { .arr[index] = upcase(value) } error[E203]: syntax error ┌─ :2:10 │ 2 │ .arr[index] = upcase(value) │ ^^^^^ │ │ │ unexpected syntax token: "Identifier" │ expected one of: "integer literal" │ = see language documentation at https://vrl.dev = try your code in the VRL REPL, learn more at https://vrl.dev/examples
Но на самом деле мы можем обработать этот массив иначе, не прибегая к созданию временного массива. Для этого мы можем воспользоваться функцией map_value, которая позволяет обрабатывать значения:
$ map_values(.arr) -> |value| { upcase!(value) } ["MY", "FIRST", "ARRAY", "LOOP"]
Теперь давайте рассмотрим аналогичное действие, но на это раз обработаем только чётные элементы массива .arr:
$ .new_arr = [] $ $ for_each(.arr) -> |index, value| { if mod(index, 2) == 0 { .new_arr = push(.new_arr, upcase(value)) } else{ .new_arr = push(.new_arr, value) } } $ $ .new_arr ["MY", "first", "ARRAY", "loop"] $ .arr = del(.new_arr) ["MY", "first", "ARRAY", "loop"]
Здесь мы вновь создаём наш временный массив, который будет хранить обработанные значения. На это раз мы указываем index без нижнего подчёркивания и в теле for_each проверяем значение индекса. Если его остаток от деления на 2 равен 0, тогда этот элемент массива .arr чётный и мы применяем к нему функцию upscale. В противном случаем элемент массива .arr будет добавлен массиву .new_arr без изменений.
Обратите внимание, на блок else. Его обязательно надо добавить в наш цикл, поскольку без него нечётные элементы просто будут отброшены:
$ .new_arr ["MY", ARRAY"]
Обработка ошибок
Немало важной частью является обработка ошибок внутри вашего трансформа. При написании пайплайна могут возникать неоднозначные ситуации, которые требуют отдельного внимания.
Представим ситуацию, у нас есть файл с таким содержимым:
1,,3
И мы хотим разбить каждое число в отдельное поле и умножить его на 5, разделив их запятой. Для этого напишем такой VRL:
arr = split(.message, ",") .var1 = to_int(arr[0]) * 5 .var2 = to_int(arr[1]) * 5 .var3 = to_int(arr[2]) * 5
А сам файл конфигурации будет выглядеть так:
sources: file_input: type: file include: - /opt/habr/vector/error.txt ignore_checkpoints: true transforms: parse_file: type: remap inputs: - file_input file: /etc/vector/habr/errors.vrl sinks: test_output: type: file inputs: - parse_file path: /opt/habr/vector/error.json encoding: codec: json
Теперь попробуем запустить vector с такой конфигурацией:
root@test-mon:/etc/vector/habr# vector --config errors.yaml 2025-07-16T14:30:36.222710Z INFO vector::app: Log level is enabled. level="info" 2025-07-16T14:30:36.223878Z INFO vector::app: Loading configs. paths=["errors.yaml"] 2025-07-16T14:30:36.226896Z ERROR vector::topology::builder: Configuration error. error=Transform "parse_file": error[E103]: unhandled fallible assignment ┌─ :1:7 │ 1 │ arr = split(.message, ",") │ ----- ^^^^^^^^^^^^^^^^^^^^ │ │ │ │ │ this expression is fallible because at least one argument's type cannot be verified to be valid │ │ update the expression to be infallible by adding a `!`: `split!(.message, ",")` │ │ `.message` argument type is `any` and this function expected a parameter `value` of type `string`
Во-первых, мы видим, что vector сразу ругается на функцию split, поскольку такая запись может привести к ошибкам. В нашем случае мы полностью уверены в нашем выборе, поэтому обозначим функцию split с символом «!», чтобы вектор проигнорировал потенциально опасную запись и продолжил работу:
arr = split!(.message, ",")
И попробуем запустить vector:
root@test-mon:/etc/vector/habr# vector --config errors.yaml 2025-07-16T14:35:09.037986Z INFO vector::app: Log level is enabled. level="info" 2025-07-16T14:35:09.039181Z INFO vector::app: Loading configs. paths=["errors.yaml"] 2025-07-16T14:35:09.042971Z ERROR vector::topology::builder: Configuration error. error=Transform "parse_file": error[E103]: unhandled fallible assignment ┌─ :3:9 │ 3 │ .var1 = to_int(arr[0]) * 5 │ ------- ^^^^^^^^^^^^^^^^^^ this expression is fallible because at least one argument's type cannot be verified to be valid
И вновь непорядок, вновь vector падает из-за ошибки. На это раз ему не нравится функция to_int.
Представим, что мы очень уверенные в себе люди, и так же добавим «!» к функции
.var1 = to_int!(arr[0]) * 5 .var2 = to_int!(arr[1]) * 5 .var3 = to_int!(arr[2]) * 5
Наверное теперь всё должно пройти хорошо и vector не свалится в ошибку:
root@test-mon:/etc/vector/habr# vector --config errors.yaml 2025-07-16T14:36:22.005848Z INFO vector::app: Log level is enabled. level="info" 2025-07-16T14:36:22.006867Z INFO vector::app: Loading configs. paths=["errors.yaml"] 2025-07-16T14:36:22.010056Z INFO vector::topology::running: Running healthchecks. 2025-07-16T14:36:22.010141Z INFO vector::topology::builder: Healthcheck passed. 2025-07-16T14:36:22.010142Z INFO vector: Vector has started. debug="false" version="0.45.0" arch="x86_64" revision="063cabb 2025-02-24 14:52:02.810034614" 2025-07-16T14:36:22.010167Z INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`. 2025-07-16T14:36:22.010197Z INFO source{component_kind="source" component_id=file_input component_type=file}: vector::sources::file: Starting file server. include=["/opt/habr/vector/error.txt"] exclude=[] 2025-07-16T14:36:22.010491Z INFO source{component_kind="source" component_id=file_input component_type=file}:file_server: file_source::checkpointer: Loaded checkpoint data. 2025-07-16T14:36:22.010623Z INFO source{component_kind="source" component_id=file_input component_type=file}:file_server: vector::internal_events::file::source: Found new file to watch. file=/opt/habr/vector/error.txt 2025-07-16T14:36:22.010881Z ERROR transform{component_kind="transform" component_id=parse_file component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"to_int\" at (65:80): Invalid integer \"\": cannot parse integer from empty string" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true
Вроде бы всё хорошо, в этот раз vector не упал. Но в логе можно увидеть одно сообщение:
2025-07-16T14:36:22.010881Z ERROR transform{component_kind="transform" component_id=parse_file component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"to_int\" at (65:80): Invalid integer \"\": cannot parse integer from empty string" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true
В этой записи, мы видим, что на вход функции to_int поступило пустое число. Что же тогда vector записал в файл вывода?
{"file":"/opt/habr/vector/error.txt","host":"test-mon","message":"1,,3","source_type":"file","timestamp":"2025-07-16T14:36:22.010766422Z"}
А в выходном файле у нас записано событие целиком, поскольку оно обработалось с ошибкой.
Этот пример отчётливо показывает, что не стоит злоупотреблять знаками «!», а нужно их использовать только в том случае, если вы уверены, что ваша форма записи отработает корректно.
Но как же нам тогда поступить в этой ситуации? Для начала стоит отметить, зачем мы используем функцию to_int. Дело в том, что после обработки функцией split, мы получаем на выходе все элементы массива arr в string. Поэтому мы не можем просто умножить на 5, нам нужно привести данные к int. И в нашем исходном файле есть пропущенное число, поэтому такой случай надо отдельно обозначить:
.var1, err = to_int(arr[0]) * 5 .var2, err = to_int(arr[1]) * 5 .var3, err = to_int(arr[2]) * 5
Посмотрим, как теперь выглядят строки в выходном файле:
{"file":"/opt/habr/vector/error.txt","host":"test-mon","message":"1,,3","source_type":"file","timestamp":"2025-07-16T17:29:05.314777030Z","var1":5,"var2":0,"var3":15}
Видно, что теперь у нас обработались наши поля, а поле с пустым значением приравнялось у нулю. Всё потому что arr[1] у нас null и при преобразовании через `to_int, мы получаем 0
Применяем VRL на практике
Теперь давайте попробуем закрепить все полученные знания на реальном примере.
Представим такой кейс, что у нас есть банковское приложение для перевода средств между клиентами. И это приложение генерирует данные по каждой транзакции.
Возьмём за основу такие данные с такой структурой полей:
"account_receiver": "234567891012", "account_sender": "750", "auth_channel_end": "PS", "auth_channel_start": "PS", "auth_context_end": "PS,43604000B000DA02,2025-02-14T10:10:28+03:00", "auth_context_start": "PS,43604000B000DA02,2025-02-14T10:10:28+03:00", "authorization_time": "2025-01-21 08:10:35", "branch_code": "123456789101", "channel_type": "3", "compliance_rules": "345678912101,91,[],[],,AC", "counterparty_id_a": "", "counterparty_id_b": "urn:86661307-056805-0", "customer_external_id": "436046005035947", "customer_id": "234567891012", "customer_internal_id": "", "device_type": "1", "entry_time_end": "2025-02-14 10:10:28", "entry_time_start": "2025-02-14 10:10:28", "entrypoint_node_end": "43604000B000DA02", "entrypoint_node_start": "43604000B000DA02", "forwarding": "", "forwarding_reason": "", "operation_code": "234567891012", "processing_code": "900", "processing_time_sec": "55", "security_flag": "", "session_token_a": "jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4", "session_token_b": "678F728F12B9D00030BA8@2.3.4.5:1234", "session_token_c": "", "status_http": "", "transaction_end_time": "2025-02-14 10:11:17", "transaction_id": "51ab65e0-0805-498f-8508-3cc4cd459ffd", "transaction_start_time": "2025-01-21 08:10:22", "transaction_status_code": "16", "transaction_type": "522", "used_services_raw": "OIP,override,0,,,2025-02-14T10:10:23+03:00,,,,[];CAT,,0,,,2025-02-14T10:10:31+03:00,,,,[]", "user_interaction_time_sec": "43"
И сами строки в сыром виде, будут выглядеть так:
2025-02-14T10:11:17+03:00;522;123456789101;51ab65e0-0805-498f-8508-3cc4cd459ffd;234567891012;2025-01-21T10:10:22+05:00;2025-01-21T10:10:35+05:00;234567891012;750;234567 891012;;;3;1;jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4;678F728F12B9D00030BA8@2.3.4.5:1234;;;436046005035947;;urn:86661307-056805-0;PS,43604000B000DA02,2025-02-14T10:10:2 8+03:00;PS,43604000B000DA02,2025-02-14T10:10:28+03:00;"";900;"345678912101,91,[],[],,AC";"OIP,override,0,,,2025-02-14T10:10:23+03:00,,,,[];CAT,,0,,,2025-02-14T10:10:31+ 03:00,,,,[]";16;;55;43;
Теперь представим себе такую задачу: наша банковская система генерирует данные в таком формате, а партнёру надо преобразовать данные в удобный для него формат. Нам необходимо будет поменять формат меток времени из RFC 3339 в простой DateTime формат, а также нам надо заменить разделители в compliance_rules и used_services_raw с «;» на «&»
Для этого набросаем такой VRL. Для начала нам надо разбить строку на части, поскольку количество элементов не фиксировано, и просто разбить через «;» не получится, то мы поделим сообщение на части с помощью регулярного выражения по ключевым моментам в каждом сообщении:
.message = string!(.message) .message = parse_regex!(.message, r'^(?P<log_part1>.*?);"(?P<log_security_flag>)";(?P<log_processing_code>[^;]*);"(?P<log_compliance_rules>[^"]*)";"(?P<log_used_service s_raw>[^"]*)";(?P<log_part2>.*)$') .part1 = .message.log_part1 .security_flag = .message.log_security_flag .processing_code = .message.log_processing_code .compliance_rules = .message.log_compliance_rules .used_services_raw = .message.log_used_services_raw .part2 = .message.log_part2
Здесь мы разбиваем сообщение на несколько частей, чтобы в дальнейшем их обработать отдельно. Также мы сразу создаём уже вычлененные поля в отдельные поля.
Теперь перейдём к обработке первой части сообщения:
.part1 = string(.part1) arr = split(.part1, ";") .transaction_end_time, err = if arr[0] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .transaction_type = to_string(arr[1]) .branch_code = to_string(arr[2]) .transaction_id = to_string(arr[3]) .customer_id = to_string(arr[4]) .transaction_start_time, err = if arr[5] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[5], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .authorization_time, err = if arr[6] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[6], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .operation_code = to_string(arr[7]) .account_sender = to_string(arr[8]) .account_receiver = to_string(arr[9]) .forwarding = to_string(arr[10]) .forwarding_reason = to_string(arr[11]) .channel_type = to_string(arr[12]) .device_type = to_string(arr[13]) .session_token_a = to_string(arr[14]) .session_token_b = to_string(arr[15]) .session_token_c = to_string(arr[16]) .customer_internal_id = to_string(arr[17]) .customer_external_id = to_string(arr[18]) .counterparty_id_a = to_string(arr[19]) .counterparty_id_b = to_string(arr[20]) .auth_context_start = to_string(arr[21]) .auth_context_end = to_string(arr[22])
Разберём это детальнее. Для начала мы разбиваем первую часть с помощью «;» и записываем это в массив arr:
.part1 = string(.part1) arr = split(.part1, ";")
После чего мы получаем мы преобразуем наш первый timestamp transaction_end_time в нужный формат.
.transaction_end_time, err = if arr[0] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" }
Для начала мы проверяем, что первый элемент массива arr не пустой, в противном случае он останется пустым, после чего мы преобразуем нашу метку времени в обычный unix timestamp, преобразуем его в int и прибавляем к нему 10800 (3 часа). Это необходимо для того, чтобы не сбить наше время от таймзоны, поскольку на выходе у нас получится unix timestamp в GMT +0. И после этого мы преобразуем наш unix timestamp в нужный формат времени. Далее мы будем применять аналогичный способ для всех меток времени.
А также создаём отдельные поля:
.transaction_type = to_string(arr[1]) ... .auth_context_end = to_string(arr[22])
Аналогичным образом мы поступим и для auth_context_start и auth_context_end
.auth_context_start = string(.auth_context_start) arr1 = split(.auth_context_start, ",") .auth_channel_start = to_string(arr1[0]) .entrypoint_node_start = to_string(arr1[1]) .entry_time_start, err = if arr1[2] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr1[2], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .auth_context_end = string(.auth_context_end) arr2 = split(.auth_context_end, ",") .auth_channel_end = to_string(arr2[0]) .entrypoint_node_end = to_string(arr2[1]) .entry_time_end, err = if arr2[2] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr2[2], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" }
Дальше мы поступим очень интересно. Внутри compliance_rules может идти список из нескольких правил, но в наем случае всегда одно. Мы будем менять разделитель «;» между правилами на «&»:
.compliance_rules = replace(.compliance_rules, ";", "&")
А также нам надо заменить запятые внутри квадратный скобок. Для этого мы воспользуемся replace_with, чтобы найти внутри compliance_rules блок с фигурными скобками и заменить внутри запятые:
.compliance_rules, err = replace_with(.compliance_rules, r'\[(.*?)\]') -> |m| { "[" + replace(m.captures[0], ",", "&") + "]" }
Здесь передаём функции compliance_rules и задаём регулярное выражение для поиска блока с квадратными скобками. Полученное совпадение записывается в переменную m и уже внутри неё меняются запятые на «&».
Далее нам надо преобразовать метку внутри used_services_raw:
.used_services_raw = string(.used_services_raw) .services_info = split(.used_services_raw, ";") .parsed_services_info = [] for_each(.services_info) -> |_index, value| { next_service = split(value, ",") service_call_time, err = if next_service[5] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(next_service[5], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } obj = { "service_name": to_string(next_service[0]), "service_mode": to_string(next_service[1]), "number_of_diversions": to_string(next_service[2]), "assoc_party_id": to_string(next_service[3]), "service_id": to_string(next_service[4]), "service_call_time": service_call_time, "number_of_participants": to_string(next_service[6]), "action_type": to_string(next_service[7]), "customer_group_tag": to_string(next_service[8]), "service_cost_info": to_string(next_service[9]) } .parsed_services_info = push(.parsed_services_info, obj) }
Внутри used_services_raw у нас может идти сколько угодно сервисов, поэтому мы разбиваем его с помощью «;» и записываем в массив service_info.
Далее мы парсим каждый сервис из массива, пробежавшись по нему функцией for_each. Внутри мы по знакомой схеме меняем timestamp, записываем это в словарь obj и записываем его в отдельное поле.
Затем мы идёт простая часть с вычленением полей из второй части сообщения:
.part2 = string(.part2) arr3 = split(.part2, ";") .transaction_status_code = to_string(arr3[0]) .status_http = to_string(arr3[1]) .processing_time_sec = to_string(arr3[2]) .user_interaction_time_sec = to_string(arr3[3])
И теперь нам остаётся собрать воедино все разбитые поля в .message, чтобы записать обработанную строку в выходной файл.
.message = .transaction_end_time + ";" + .transaction_type + ";" + .branch_code + ";" + .transaction_id + ";" + .customer_id + ";" + .transaction_start_time + ";" + .authorization_time + ";" + .operation_code + ";" + .account_sender + ";" + .account_receiver + ";" + .forwarding + ";" + .forwarding_reason + ";" + .channel_type + ";" + .device_type + ";" + .session_token_a + ";" + .session_token_b + ";" + .session_token_c + ";" + .customer_internal_id + ";" + .customer_external_id + ";" + .counterparty_id_a + ";" + .counterparty_id_b + ";" + .auth_channel_start + "," + .entrypoint_node_start + "," + .entry_time_start + ";" + .auth_channel_end + "," + .entrypoint_node_end + "," + .entry_time_end + ";\"" + .security_flag + "\";" + .processing_code + ";\"" + .compliance_rules + "\";\"" for_each(.parsed_services_info) -> |index, obj| { if index != 0 { .message = .message + "&" } .message = .message + obj.service_name + "," + obj.service_mode + "," + obj.number_of_diversions + "," + obj.assoc_party_id + "," + obj.service_id + "," + obj.service_call_time + "," + obj.number_of_participants + "," + obj.action_type + "," + obj.customer_group_tag + "," + obj.service_cost_info } .message = .message + "\";" + .transaction_status_code + ";" + .status_http + ";" + .processing_time_sec + ";" + .user_interaction_time_sec + ";"
Здесь мы поочерёдно прописываем в нужном порядке наши поля, добавляя к ним разделители.
Отдельно можно выделить секцию с parsed_services_info. Нам также надо заменить разделитель между сервисами на «&». Поэтому мы поочерёдно записываем содержимое словарей в message, а также добавляем перед ним «&». Но здесь, стоит заметить, что у нас добавлена проверка для первого элемента, чтобы перед ним не ставился разделитель, поскольку перед ним нет другого сервиса, иначе первый сервис тоже бы начинался с разделителя, что неверно.
Конечный VRL будет выглядеть так:
.message = string!(.message) .message = parse_regex!(.message, r'^(?P<log_part1>.*?);"(?P<log_security_flag>)";(?P<log_processing_code>[^;]*);"(?P<log_compliance_rules>[^"]*)";"(?P<log_used_services_raw>[^"]*)";(?P<log_part2>.*)$') .part1 = .message.log_part1 .security_flag = .message.log_security_flag .processing_code = .message.log_processing_code .compliance_rules = .message.log_compliance_rules .used_services_raw = .message.log_used_services_raw .part2 = .message.log_part2 .part1 = string(.part1) arr = split(.part1, ";") .transaction_end_time, err = if arr[0] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[0], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .transaction_type = to_string(arr[1]) .branch_code = to_string(arr[2]) .transaction_id = to_string(arr[3]) .customer_id = to_string(arr[4]) .transaction_start_time, err = if arr[5] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[5], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .authorization_time, err = if arr[6] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr[6], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .operation_code = to_string(arr[7]) .account_sender = to_string(arr[8]) .account_receiver = to_string(arr[9]) .forwarding = to_string(arr[10]) .forwarding_reason = to_string(arr[11]) .channel_type = to_string(arr[12]) .device_type = to_string(arr[13]) .session_token_a = to_string(arr[14]) .session_token_b = to_string(arr[15]) .session_token_c = to_string(arr[16]) .customer_internal_id = to_string(arr[17]) .customer_external_id = to_string(arr[18]) .counterparty_id_a = to_string(arr[19]) .counterparty_id_b = to_string(arr[20]) .auth_context_start = to_string(arr[21]) .auth_context_end = to_string(arr[22]) .auth_context_start = string(.auth_context_start) arr1 = split(.auth_context_start, ",") .auth_channel_start = to_string(arr1[0]) .entrypoint_node_start = to_string(arr1[1]) .entry_time_start, err = if arr1[2] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr1[2], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .auth_context_end = string(.auth_context_end) arr2 = split(.auth_context_end, ",") .auth_channel_end = to_string(arr2[0]) .entrypoint_node_end = to_string(arr2[1]) .entry_time_end, err = if arr2[2] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(arr2[2], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } .compliance_rules = replace(.compliance_rules, ";", "&") .compliance_rules, err = replace_with(.compliance_rules, r'\[(.*?)\]') -> |m| { "[" + replace(m.captures[0], ",", "&") + "]" } .used_services_raw = string(.used_services_raw) .services_info = split(.used_services_raw, ";") .parsed_services_info = [] for_each(.services_info) -> |_index, value| { next_service = split(value, ",") service_call_time, err = if next_service[5] != "" { unix_timestamp = to_unix_timestamp(parse_timestamp!(next_service[5], "%+")) unix_timestamp = to_int(unix_timestamp) + 10800 format_timestamp(from_unix_timestamp!(unix_timestamp), "%Y-%m-%d %H:%M:%S") } else { "" } obj = { "service_name": to_string(next_service[0]), "service_mode": to_string(next_service[1]), "number_of_diversions": to_string(next_service[2]), "assoc_party_id": to_string(next_service[3]), "service_id": to_string(next_service[4]), "service_call_time": service_call_time, "number_of_participants": to_string(next_service[6]), "action_type": to_string(next_service[7]), "customer_group_tag": to_string(next_service[8]), "service_cost_info": to_string(next_service[9]) } .parsed_services_info = push(.parsed_services_info, obj) } .part2 = string(.part2) arr3 = split(.part2, ";") .transaction_status_code = to_string(arr3[0]) .status_http = to_string(arr3[1]) .processing_time_sec = to_string(arr3[2]) .user_interaction_time_sec = to_string(arr3[3]) .message = .transaction_end_time + ";" + .transaction_type + ";" + .branch_code + ";" + .transaction_id + ";" + .customer_id + ";" + .transaction_start_time + ";" + .authorization_time + ";" + .operation_code + ";" + .account_sender + ";" + .account_receiver + ";" + .forwarding + ";" + .forwarding_reason + ";" + .channel_type + ";" + .device_type + ";" + .session_token_a + ";" + .session_token_b + ";" + .session_token_c + ";" + .customer_internal_id + ";" + .customer_external_id + ";" + .counterparty_id_a + ";" + .counterparty_id_b + ";" + .auth_channel_start + "," + .entrypoint_node_start + "," + .entry_time_start + ";" + .auth_channel_end + "," + .entrypoint_node_end + "," + .entry_time_end + ";\"" + .security_flag + "\";" + .processing_code + ";\"" + .compliance_rules + "\";\"" for_each(.parsed_services_info) -> |index, obj| { if index != 0 { .message = .message + "&" } .message = .message + obj.service_name + "," + obj.service_mode + "," + obj.number_of_diversions + "," + obj.assoc_party_id + "," + obj.service_id + "," + obj.service_call_time + "," + obj.number_of_participants + "," + obj.action_type + "," + obj.customer_group_tag + "," + obj.service_cost_info } .message = .message + "\";" + .transaction_status_code + ";" + .status_http + ";" + .processing_time_sec + ";" + .user_interaction_time_sec + ";" del(.host) del(.file) del(.timestamp) del(.source_type)
И на выходе мы получаем такую обработанную строку:
2025-02-14 10:11:17;522;123456789101;51ab65e0-0805-498f-8508-3cc4cd459ffd;234567891012;2025-01-21 08:10:22;2025-01-21 08:10:35;234567891012;750;234567891012;;;3;1;jrccpjjjqhkcj2ar2k0srqrqa20qanc2@1.2.3.4;678F728F12B9D00030BA8@2.3.4.5:1234;;;436046005035947;;urn:86661307-056805-0;PS,43604000B000DA02,2025-02-14 10:10:28;PS,43604000B000DA02,2025-02-14 10:10:28;"";900;"345678912101,91,[],[],,AC";"OIP,override,0,,,2025-02-14 10:10:23,,,,[]&CAT,,0,,,2025-02-14 10:10:31,,,,[]";16;;55;43;
Заключение
Надеюсь, мне удалось вам донести принцип работы с VRL. Мы рассмотрели синтаксис и базовые функции для работы с данными. А также попробовали применить полученные знания на практике. Не стоит сильно заострят внимание на требованиях к обработке данных из примера, поскольку главной задачей было показать базовый подход к обработке данных, а также зацепить нестандартные методы.
Для будет приятна, если эта статья может вам освоить такой замечательный инструмент как vector и поможем вам составлять пайплайны для обработки своих данных.
ссылка на оригинал статьи https://habr.com/ru/articles/933762/
Добавить комментарий