Добрый день. Меня зовут Иван Клименко, и я архитектор департамента поддержки продаж компании Arenadata. В основном занимаюсь продуктом Arenadata Streaming (ADS) — это масштабируемая отказоустойчивая система для потоковой обработки данных в режиме реального времени, адаптированная для корпоративного использования и построенная на базе Apache Kafka и Apache NiFi. В продукт входит замечательный сервис Arenadata Streaming NiFi, который является low-code средством построения интеграционных потоков с возможностью масштабирования.
Сегодня хочу показать на одном из практических случаев, что не всегда в NiFi удаётся следовать шаблонному подходу record-oriented в разработке потоков обработки и трансформации данных.
Как известно, в NiFi применяется так называемый record-oriented-подход. Что это значит? Каждый контент FlowFile представляет собой не один элемент, а множество одинаковых по структуре записей. И работать с ним можно как с массивом записей, проводя необходимые трансформации и манипуляции, например выборки, обогащения либо изменения содержимого полей. В этом случае если запись изменяется, то контент FlowFile заново записывается на диск, а в атрибутах меняется системная ссылка на файл контента. Для чтения и записи элементов применяются специальные сервисы: RecordReader и RecordSetWriter, — которые, основываясь на схемах данных, позволяют процессорам обработки оперировать записями, не вдаваясь в детали их хранения. Такой подход позволяет сократить накладные расходы, так как хранить и обращаться к единому множеству записей в одном файле с одним набором метаданных выгоднее, чем хранить каждый файл с метаданными отдельно. Стандарты обработки данных тоже сместили фокус с индивидуальной обработки на работу с записями. И одной из моих рекомендаций всегда является применение обработки записей с помощью предназначенных для этого процессоров: UpdateRecord, JoltTransformRecord, QueryRecord и так далее. Это быстрее за счёт обращения к одному файлу, а оперативная память требуется только на один набор атрибутов, а не на множество.
Но что делать, если в вашем потоке обработки всё построено на записях, но от одного источника приходят данные, которые выпадают из общей парадигмы? Я расскажу об одном случае из моей практики, когда при решении задачи наполнения хранилища пришлось провести нестандартную обработку.
Для понимания задачи опишу информационную среду, в которой я разрабатывал интеграционные процессы. Есть компания, которая производит и продаёт товары, допустим мебель. Она использует различные бизнес-сервисы, позволяющие автоматизировать задачи по обороту товаров, логистике, складскому учёту, кадровым задачам, продажам и производству. Данные для построения отчётности компания собирает в аналитическом хранилище. А сама аналитика информации нужна для комплексной оценки эффективности бизнеса, текущей работы компании и прогнозирования во всех сферах деятельности. Источниками данных для хранилища являются различные ERP-, CRM-, PIM-, MDM-системы, веб-приложения и приложения обработки данных компании. В целях оптимизации информационного обмена прямой доступ между системами заменён на обмен сообщениями через единую шину данных. Чтобы сократить объём данных, в компании введено правило: в топике Kafkaнаходятся сообщения, сформированные по одной схеме, и сами сообщения преобразованы в Avro по стандарту Confluent. Схемы сохранены в ADS. Schema Registry. Обобщённая структура информационной среды представлена на рисунке.
Также в компании принят стандарт передачи сообщения в виде блока метаданных и данных. Обязательным элементом метаданных является время генерации сообщения, остальные поля оставлены на усмотрение генерирующий системы. Таким образом, данные представляют собой структуру, разделённую на два уровня: информационный блок и непосредственно структуру от источника, например, в таком виде:
Пример передаваемых данных
{ "metadata" : { "ts" : "2020-01-01 00:00:00.0000" }, "data" : { "someFields1" : "some value", "someFieeld2" : 0, "someFieeld2 : true } }
ETL-процесс наполнения хранилища выглядит максимально просто: извлечение данных из требуемых топиков, необходимые трансформации по приведению[АД1] к структуре, пригодной для внесения в таблицу базы данных, сопоставление исходного топика целевой таблице в хранилище и внесение данных в хранилище.
Следующим этапом нужно подготовить данные для внесения в таблицу. Для этого требуется преобразовать полученную иерархическую структуру в плоскую таблицу, сдвинув поля из вложенности на уровень вверх, убрать ненужные теги metadata и data. Дополнительно понадобилось добавить в структуру время формирования записи из метаданных. Стейджинг формируется таком образом, что одному топику, а следовательно, одной структуре, соответствует одна целевая таблица, при этом имя таблицы соответствует имени топика. Дальнейшая обработка данных вынесена в процедуры расчёта хранилища и не завязана на сам ETL-процесс.
При получении данных из Kafka с помощью процессора ConsumeKafkaRecord флаг «Separate By Key» выставлен в значение «false», что позволяет формировать один FlowFile для нескольких сообщений. Так как нужно просто «сдвинуть» поля на один уровень, в качестве инструмента сразу напрашивается Jolt. Для упрощения структуры данных можно применить простую Jolt-трансформацию, которая перебирает все поля второго уровня и сдвигает их на первый уровень:
Трансформация, преобразующая двухуровневую иерархию в плоскую струтуру
[ { "operation": "shift", "spec": { "*": { "*": "&" } } } ]
Трансформация позволяет работать с одной записью, и её можно применить в процессоре JoltTransformRecord. Таким образом, получается универсальный процесс, извлекающий данные из Kafka, трансформирующий их в плоскую структуру и передающий в целевую базу данных.
Конечно, представленный поток не готов к выводу в продуктовую среду, это только его ядро, которое необходимо дополнить обработкой ошибок, сформировать повторы, добавить оповещение. Тем не менее в таком виде уже получаем универсальный поток обработки, позволяющий загружать данные из любого топика в соответствующую таблицу целевой базы. Если внести список топиков в параметры контекста и управлять ими через API, то сможем автоматизировано управлять источниками.
Бизнес-системы не являются статичными, и список источников данных для аналитических хранилищ постоянно изменяется. В один момент появилась PIM/MDM-система, генерирующая данные не в простом формате, а с гораздо более сложной иерархией.
Сложная структура данных, генерируемая PIM/MDM-системой (очень большой JSON)
{ "metadata" : { "ts" : "2024-05-14 22:15:68.5896" }, "data" : { "object" : { "guid" : "04551935-d520-5785-b257-c8ae8240711c", "template" : "Item", "language" : null, "fields" : { "itemGroup" : { "guid" : "497b6e51-4c1d-5ce0-be02-b8f37ba35c26", "template" : "ItemGroup", "language" : null, "fields" : { "name" : "GroupNanme", "parentObject" : { "guid" : "32d38329-1868-5f51-bb30-f824258539fb", "template" : "ItemGroup", "language" : null, "fields" : { "name" : "parent name" } }, "specification" : { "guid" : null, "template" : "SpecificationBrickContainer", "language" : null, "fields" : { "internalGroupSpecification" : { "guid" : null, "template" : "InternalGroupSpecificationBrick", "language" : null, "fields" : { "volumeRatio" : "1.3", "groupAttribute" : { "guid" : "84281670-f476-5f3f-8b31-382f02e13e83", "template" : "ItemGroup", "language" : null, "fields" : { "name" : "group name", "parentObject" : { "guid" : "1ba54e86-bc86-59ac-93e9-34fa4a68abe3", "template" : "Folder", "language" : null, "fields" : { "name" : "another name" } } } }, "codeGroup" : "5561010305" } } } } } }, "additionalInformation" : { "guid" : null, "template" : "AdditionalInformationBrickContainer", "language" : null, "fields" : { "additionalInformation" : [ "AdditionalInformationBrick" ] } }, "manufactureCountry" : { "guid" : "f64ab0a6-cff1-5dcd-9f1c-cc43f1c58d03", "template" : "Property", "language" : null, "fields" : { "name" : "US" } }, "code" : "5896dkwskeo_kedoed896", "parentObject" : { "guid" : "ce5a885b-8170-5bbe-80c1-c0d2d41d0cad", "template" : "Item", "language" : null, "fields" : { "name" : "parent name", "parentObject" : { "guid" : "83c7b0c5-3d1c-50ff-a9f0-0b0fc6e47cbd", "template" : "ItemTemplate", "language" : null, "fields" : { "name" : "object source name" } }, "statusCode" : "draft" } }, "ean13" : "462711234164", "measureUnit" : { "unitName" : "item", "unitCode" : "796" }, "main" : { "guid" : null, "template" : "MainBrickContainer", "language" : null, "fields" : { "localizedDescription" : [ "LocalizedDescriptionBrick" ] } }, "media" : { "guid" : null, "template" : "MediaBrickContainer", "language" : null, "fields" : { "media" : [ "MediaBrick" ] } }, "commerce" : { "guid" : null, "template" : "CommerceBrickContainer", "language" : null, "fields" : { "product" : { "guid" : null, "template" : "ProductBrick", "language" : null, "fields" : { "ean13" : "4627112341690" } } } }, "pack" : [ { "packageName" : "coverage", "package" : null, "length" : { "value" : 140.0, "measureUnit" : { "unitName" : "sm", "unitCode" : "004" } }, "width" : { "value" : 200.0, "measureUnit" : { "unitName" : "sm", "unitCode" : "004" } }, "height" : { "value" : 10.0, "measureUnit" : { "unitName" : "sm", "unitCode" : "004" } }, "volume" : { "value" : 0.28, "measureUnit" : { "unitName" : "m3", "unitCode" : "113" } }, "weightNetto" : { "value" : 35.9, "measureUnit" : { "unitName" : "kg", "unitCode" : "166" } }, "weightGross" : { "value" : 35.9, "measureUnit" : { "unitName" : "kg", "unitCode" : "166" } }, "seatsCount" : null, "packagedUnitsCount" : null, "setSeatsCount" : null, "priority" : null, "measureUnit" : null, "mainLogisticsPackaging" : null, "barcode" : null } ], "isMatrix" : 1, "series" : { "guid" : "f68d1bc3-2600-5b40-b9b1-957d039cdd6b", "template" : "tmp", "language" : null, "fields" : { "name" : "seria name", "priceSegment" : { "guid" : "364044ec-0f96-5a5f-9028-9dc25f752e5e", "template" : "Property", "language" : null, "fields" : { "name" : "middle" } }, "parentObject" : { "guid" : "a497557f-4eda-5f16-807a-4b71eb5c1b74", "template" : "Seria", "language" : null, "fields" : { "name" : "GENERAL Seria name" } } } }, "name" : "Common Name", "options" : { "guid" : null, "template" : "OptionsCollection", "language" : null, "fields" : { "length" : { "guid" : null, "template" : "OptionsCollectionItem", "language" : null, "fields" : { "propertyType" : "length", "relation" : [ { "guid" : "28df4e5e-5201-519a-a3a5-57d7e1838361", "template" : "Property", "language" : null, "fields" : { "name" : 200, "options" : { "guid" : null, "template" : "OptionsBrickContainer", "language" : null, "fields" : { "propertyMDMData" : { "guid" : null, "template" : "PropertyMDMDataBrick", "language" : null, "fields" : { "code" : 10004 } }, "propertySize" : { "guid" : null, "template" : "PropertySizeBrick", "language" : null, "fields" : { "physicalQuantity" : "length", "numericValue" : { "value" : 200.0, "measureUnit" : { "unitName" : "sm", "unitCode" : "004" } } } } } } } } ] } }, "width" : { "guid" : null, "template" : "OptionsCollectionItem", "language" : null, "fields" : { "propertyType" : "width", "relation" : [ { "guid" : "59c08ec1-d0ff-55c4-99a4-6616f23f1469", "template" : "Property", "language" : null, "fields" : { "name" : 140, "options" : { "guid" : null, "template" : "OptionsBrickContainer", "language" : null, "fields" : { "propertyMDMData" : { "guid" : null, "template" : "PropertyMDMDataBrick", "language" : null, "fields" : { "code" : 10005 } }, "propertySize" : { "guid" : null, "template" : "PropertySizeBrick", "language" : null, "fields" : { "physicalQuantity" : "width", "numericValue" : { "value" : 140.0, "measureUnit" : { "unitName" : "sm", "unitCode" : "004" } } } } } } } } ] } }, "height" : { "guid" : null, "template" : "OptionsCollectionItem", "language" : null, "fields" : { "propertyType" : "height", "relation" : [ { "guid" : "c28c2569-7a0d-5d1a-94d4-4907dc91f20a", "template" : "Property", "language" : null, "fields" : { "name" : 112, "options" : { "guid" : null, "template" : "OptionsBrickContainer", "language" : null, "fields" : { "propertyMDMData" : { "guid" : null, "template" : "PropertyMDMDataBrick", "language" : null, "fields" : { "code" : 10006 } }, "propertySize" : { "guid" : null, "template" : "PropertySizeBrick", "language" : null, "fields" : { "physicalQuantity" : "height", "numericValue" : { "value" : 112.0, "measureUnit" : { "unitName" : "sm", "unitCode" : "004" } } } } } } } } ] } } } }, "sku" : 1563340922, "statusCode" : "draft" } }, "stateName" : "Updated" }
В данных от этой системы передаётся полная информация о товаре, его составляющих, размерах, свойствах упаковки. Такой формат позволяет потребителям самостоятельно определять способы хранения соответствующих сведений для каждого свойства товара, а системе — генерировать одно сообщение на единицу товара, не выгружая в отдельные потоки связанные данные. Мне же требовалось сформировать из этих сообщений плоскую таблицу по заданному SourceToTarget:
Поле |
Имя поля |
Тип данных |
data.object.guid |
guid |
uniqueidentifier |
data.object.fields.manufactureCountry.guid |
manufactureCountryGuid |
uniqueidentifier |
data.object.fields.manufactureCountry.fields.name |
manufactureCountryName |
text |
data.object.fields.itemGroup.guid |
itemGroupGuid |
uniqueidentifier |
data.object.fields.parentObject.guid |
parentGuid |
uniqueidentifier |
data.object.fields.sourceProduction.[0] |
sourceProductionValue |
bool |
data.object.fields.measureUnit.unitName |
unitName |
text |
data.object.fields.measureUnit.unitCode |
unitCode |
int |
data.object.fields.commerce.fields.product.fields.tax |
tax |
int |
data.object.fields.series.guid |
seriesGuid |
uniqueidentifier |
data.object.fields.series.fields.name |
seriesName |
text |
data.object.fields.series.fields.priceSegment.guid |
priceSegmentGuid |
uniqueidentifier |
data.object.fields.series.fields.priceSegment.fields.name |
priceSegmentName |
text |
data.object.fields.name |
name |
text |
data.object.fields.dimensionCharacteristics |
dimensionCharacteristics |
int |
data.object.fields.sku |
sku |
text |
data.object.fields.isOutOfCollection |
isOutOfCollection |
bool |
data.object.fields.isMatrix |
isMatrix |
bool |
data.object.fields.kit |
kit |
int |
data.object.fields.numberOfParts |
numberOfParts |
int |
data.object.fields.ean13 |
ean13 |
text |
data.object.fields.code |
code |
text |
data.object.fields.statusCode |
statusCode |
text |
data.stateName |
stateName |
text |
metadata.ts |
ts |
datetime2 |
Выход простой: написать спецификацию Jolt для текущего случая, сделать ответвление на основании имени топика, а после трансформации вернуть поток в стандартный маршрут. Разработав и отладив спецификацию, применил её в JoltTransformRecord.
Спецификация, решающая поставленную задачу
[ { "operation": "shift", "spec": { "metadata": { "*": "&" }, "data": { "object": { "guid": "&", "fields": { "manufactureCountry": { "guid": "manufactureCountryGuid", "fields": { "name": "manufactureCountryName" } }, "itemGroup": { "guid": "itemGroupGuid" }, "parentObject": { "guid": "parentGuid" }, "sourceProduction": { "0": "SourceProductionValue" }, "measureUnit": { "*": "&" }, "commerce": { "fields": { "product": { "fields": { "tax": "&" } } } }, "series": { "guid": "seriesGuid", "fields": { "name": "seriesName", "priceSegment": { "guid": "priceSegmentGuid", "fields": { "name": "priceSegmentName" } } } }, "name": "&", "dimensionCharacteristics": "&", "sku": "&", "isOutOfCollection": "&", "isMatrix": "&", "kit": "&", "numberOfParts": "&", "ean13": "&", "code": "&", "statusCode": "&" } }, "stateName": "StateName" } } }, { "operation": "modify-overwrite-beta", "spec": { "SourceProductionValue": "=toInteger", "unitCode": "=toInteger", "tax": "=toInteger", "dimensionCharacteristics": "=toInteger", "isOutOfCollection": "=toInteger", "isMatrix": "=toInteger", "kit": "=toInteger", "numberOfParts": "=toInteger", "sku": "=toString" } }, { "operation": "shift", "spec": { "isOutOfCollection": { "0": { "#false": "isOutOfCollection" }, "1": { "#true": "isOutOfCollection" } }, "isMatrix": { "0": { "#false": "isMatrix" }, "1": { "#true": "isMatrix" } }, "kit": { "0": { "#false": "kit" }, "1": { "#true": "kit" } }, "*": "&" } }, { "operation": "modify-overwrite-beta", "spec": { "isOutOfCollection": "=toBoolean", "isMatrix": "=toBoolean", "kit": "=toBoolean" } } ]
Пояснение по блокам спецификации JOLT
Первый блок «shift» сдвигает данные на один уровень. В нем все просто — взять указанное значение и поместить его в указанное поле, но на другом уровне. Например, поле «data.object.guid» перемещается в «guid»:
{ "operation": "shift", "spec": { "metadata": { "*": "&" }, "data": { "object": { "guid": "&"
Следующий блок «modify-overwrite-beta» преобразует данные к требуемым типам, так как в исходных данных тип может не соответствовать целевому. Пример такого преобразования для поля «isMatrix». Несмотря на то, что в исходном виде это число 0 или 1, в результате трасформации поле приводилось к строке.
"isMatrix": "=toInteger"
Цель преобразования некоторых полей к численным типам раскрывается в следующем блоке трансформации «shift». Не многие знают, что с помощью это операции можно не только перенести значения из одного поля в другое, но также делать это с условиями:
"isMatrix": { "0": { "#false": "isMatrix" }, "1": { "#true": "isMatrix" }
Исходя из значения поля «isMatrix» значение в нем заменяется на строку «true» или «false». И самым последним блоком строковое представление преобразуется к логическому типу:
{ "operation": "modify-overwrite-beta", "spec": { "isOutOfCollection": "=toBoolean", "isMatrix": "=toBoolean", "kit": "=toBoolean" } }
В результате получено полное соответствие S2T.
Однако в процессе применения трансформации формировалась ошибка:
JoltTransformRecord[id=018f1086-7c30-1015-2569-a67ef2af5b1d] Unable to transform FlowFile[filename=0c119857-beac-4f4f-96c7-406b71c2c9fc] due to org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value of class [Ljava.lang.Object; because the type is not supported: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value of class [Ljava.lang.Object; because the type is not supported
При получении ошибки я провёл ряд манипуляций, которые делаю всегда: разбить FlowFile на единичные записи, упростить трансформацию, преобразовать форматы. Результат был отрицательным, так как процессор JoltTransformRecord постоянно выдавал ошибку. Упрощая структуру сообщения, я выяснил, что ошибка возникает потому, что JoltTransformRecord не может корректно обработать массив, в котором присутствуют вариативные типы данных, определяемые типом map. Также не помог вариант с предварительным формированием целевой схемы данных и указанием её соответствующему RecordSetWriter. Перевод типа данных из Avro в JSON тоже не дал результата, так как тип map и массивы записей никуда не ушли и стали генерироваться на основании самих данных.
Avro-схема данных
{ "type": "record", "name": "Mdm_Record", "namespace": "any.org", "fields": [ { "name": "metadata", "type": { "type": "record", "name": "MessageInfo", "namespace": "any.org", "fields": [ { "name": "ts", "type": { "type": "long", "logicalType": "timestamp-millis" } } ] } }, { "name": "data", "type": { "type": "record", "name": "MDM_Item", "fields": [ { "name": "object", "type": { "type": "record", "name": "Node", "fields": [ { "name": "guid", "type": [ "null", "string" ], "default": null }, { "name": "template", "type": [ "null", "string" ], "default": null }, { "name": "language", "type": [ "null", "string" ], "default": null }, { "name": "fields", "type": { "type": "map", "values": [ "Node", { "type": "record", "name": "Barcode", "fields": [ { "name": "provider", "type": { "type": "record", "name": "Provider", "fields": [ { "name": "name", "type": "string" }, { "name": "city", "type": [ "null", "string" ], "default": null }, { "name": "taxIdNumber", "type": [ "null", "string" ], "default": null }, { "name": "taxRegistrationReasonCode", "type": [ "null", "string" ], "default": null } ] } }, { "name": "barcode", "type": [ "null", "string" ], "default": null }, { "name": "supplierArticle", "type": [ "null", "string" ], "default": null } ] }, { "type": "record", "name": "MediaResource", "fields": [ { "name": "link", "type": "string", }, { "name": "filename", "type": [ "null", "string" ], "default": null }, { "name": "mimetype", "type": [ "null", "string" ], "default": null }, { "name": "fileSize", "type": [ "null", "int" ], "default": null }, { "name": "format", "type": [ "null", "string" ], "default": null } ] }, { "type": "record", "name": "SofaSpecification", "fields": [ { "name": "seats", "type": [ "null", "int" ], "default": null }, { "name": "bedLength", "type": { "type": "record", "name": "QuantityValue", "fields": [ { "name": "value", "type": "float", }, { "name": "measureUnit", "type": [ "null", { "type": "record", "name": "MeasureUnit", "fields": [ { "name": "unitName", "type": "string", }, { "name": "unitCode", "type": "string", } ] } ], "default": null } ] } }, { "name": "bedWidth", "type": "QuantityValue", }, { "name": "backHeightFolded", "type": "QuantityValue", }, { "name": "backHeightFoldedOut", "type": "QuantityValue", }, { "name": "fullLength", "type": "QuantityValue", } ] }, { "type": "record", "name": "MattressSpecification", "fields": [ { "name": "bedLength", "type": "QuantityValue", }, { "name": "bedWidth", "type": "QuantityValue", }, { "name": "height", "type": "QuantityValue", }, { "name": "maximumBedLoad", "type": "QuantityValue", } ] }, "QuantityValue", "MeasureUnit", { "type": "array", "items": [ "Node", "Barcode", { "type": "record", "name": "ItemPack", "fields": [ { "name": "packageName", "type": "string", }, { "name": "package", "type": [ "null", { "type": "record", "name": "Pack", "fields": [ { "name": "packageGuid", "type": "string", }, { "name": "packageType", "type": [ "null", "string" ], "default": null }, { "name": "packaging", "type": { "type": "map", "values": [ { "type": "record", "name": "Box", "fields": [ { "name": "length", "type": "QuantityValue", }, { "name": "width", "type": "QuantityValue", }, { "name": "height", "type": "QuantityValue", } ] }, { "type": "record", "name": "SoftPackaging", "fields": [ { "name": "twist", "type": "boolean", } ] } ] }, } ] } ], "default": null }, { "name": "length", "type": "QuantityValue", }, { "name": "width", "type": "QuantityValue", }, { "name": "height", "type": "QuantityValue", }, { "name": "volume", "type": "QuantityValue", }, { "name": "weightNetto", "type": "QuantityValue", }, { "name": "weightGross", "type": "QuantityValue", }, { "name": "seatsCount", "type": [ "null", "int" ], "default": null }, { "name": "packagedUnitsCount", "type": [ "null", "int" ], "default": null }, { "name": "setSeatsCount", "type": [ "null", "int" ], "default": null }, { "name": "priority", "type": [ "null", "int" ], "default": null }, { "name": "measureUnit", "type": [ "null", "MeasureUnit" ], "default": null }, { "name": "mainLogisticsPackaging", "type": [ "null", "boolean" ], "default": null }, { "name": "barcode", "type": [ "null", "string" ], "default": null } ] }, "MediaResource", "string", "int", "boolean" ] }, { "type": "map", "values": [ "string", "int", "boolean" ] }, "string", "int", "boolean" ] } } ] } }, { "name": "stateName", "type": "string" } ] } } ] }
Так как выбранный ранее вариант с трансформацией Jolt не работал, я решил применить проверенное средство для крайних случаев — скрипт. Есть хороший процессор — ScriptedTransformRecord, позволяющий выполнять обработку по одной записи, получая объект типа Record.
Скрипт получился довольно большим по причине наличия типов map, а также возможности значения «null» в требуемых полях. Так как существует вероятность модификации формата данных либо корректировки S2T, то в будущем потребуется изменять скрипт, что при его большом объёме влечёт увеличение сложности для разработчика. Так что я решил отказаться от скрипта и вернуться к разработанной и отлаженной спецификации, ведь для JSON она работает корректно, ошибка возникает только при работе с записью, когда применяется схема данных.
То есть в этом случае я решил отказаться от обработки записи и перейти к обработке контента целиком, что приводит к увеличению количества FlowFile и, соответственно, увеличению задействования оперативной памяти.
В NiFi можно применить спецификацию Jolt к записям c помощью JoltTransformRecord или воспользоваться процессором JoltTransformJSON, который ожидает на входе JSON и преобразовывает его не как запись, а как единый JSON-файл. Так как происходит обработка всего контента целиком, то для сокращения накладных расходов лучше подавать на вход единичный объект JSON, а не массив. Поэтому предварительно потребовалась разбивка пришедшего FlowFile на фрагменты, где каждый содержал бы один JSON. Это позволило в дальнейшем выполнить трансформацию для единичного объекта быстро, но повлекло генерацию большого количества FlowFile. Для этого применил SplitRecord, где Reader читал Avro-формат, а RecordSetWriter был настроен для записи одного JSON-объекта.
Для сокращения дальнейших накладных расходов данные объединяются с помощью MergeRecord. Перед слиянием выполнена конвертация в Avro с заданной целевой схемой, соответствующей S2T.
Далее данные вернулись в основной поток и были успешно внесены в базу данных. Таким образом, из-за сложной структуры данных пришлось изменить основной паттерн работы с данными в NiFi — работать с записями. В итоге, допустив проигрыш по ресурсу на небольшом участке потока обработки данных, в целом я выиграл, оставаясь в ограниченном круге инструментов (только Jolt в трансформациях) и вернув данные в общий поток.
По скорости обработки ScriptedTransformRecord был быстрее из-за обработки одного файла с множеством записей. Однако сложность скрипта в процессе модернизации или поддержки снижает привлекательность такого решения. Анализ же спецификации займёт меньше времени, тем самым позволив быстрее вынести корректировки в случае изменения структуры данных.
В ходе дальнейшей работы стало ясно, что ответвление оправдало себя многократно. Так как исходный объект является записью о товаре и включает совокупность большого количества сущностей и связей с другими сущностями, то из одного сообщения понадобилось формировать более десятка стейджинговых таблиц. И решение было простое: раз уже есть разбивка на отдельные JSON, можно разработать отдельные спецификации Jolt для каждого случая, сделать итоговые схемы данных и все преобразования реализовать одинаковым шаблоном.
Что я хотел сказать, описывая этот случай. В NiFi есть много способов достичь требуемого результата. Иногда стоит попробовать несколько вариантов, но выбирать тот, который более прост в понимании разработчиком и требует меньше усилий при поддержке или модификации.
В заключение сформирую ряд рекомендаций для разработчиков, которые, на мой взгляд, будут полезны:
-
Вся обработка должна выполняться над записями. Это позволит применять бинарные форматы и минимизировать затраты на преобразования.
-
Переходить к обработке контента целиком только в случае, когда ресурсоёмкость работы с записями (тут и затраты на поддержку, и развитие потока, и сама обработка) превышает выбранную вами границу.
-
Если потребовалось перейти к обработке контента целиком, то необходимо вернуться к записям как можно быстрее, при этом обеспечить удаление всех ссылок на единичные FlowFile для очистки репозитория контента и оперативной памяти.
-
Формировать «универсальные» потоки, где параметры процессоров задаются на основе атрибутов. Это позволит применять один и тот же набор процессоров для разных источников, минимизировав количество запускаемых экземпляров процессоров и снизив нагрузку на планировщик.
Полезные ссылки:
-
Сайт Arenadata — https://arenadata.tech
-
Описание продукта Arenadata Streaming — https://arenadata.tech/products/arenadata-streaming/
-
Сообщество Apache Nifi в телеграм — https://t.me/nifiusers
-
Приложение, позволяющее отлаживать Jolt — http://jolt-demo.appspot.com/#inception
ссылка на оригинал статьи https://habr.com/ru/articles/827534/
Добавить комментарий