Ремарка:
Текущая реализация представляет собой сырой прототип, направленный исключительно на демонстрацию возможности отображения логического плана из Apache Spark в Apache Atlas. Lfyysq прототип, по сути, является «прототипом прототипа» и служит лишь начальной отправной точкой для более глубокого анализа и разработки.
В данной работе Автор не стремимся представить окончательное или оптимальное решение. основной фокус заключается в демонстрации принципа и наметке необходимых методов для интеграции логических планов с метаданными в Apache Atlas.
Автор не призываем использовать данный подход в производственной среде в его текущем виде. Для полноценного решения задачи требуется дальнейшая проработка, включая создание специализированных библиотек, улучшение архитектуры. И все прочие прочие …
Цель Работы:
Целью данной работы является создание прототипа, демонстрирующего возможность интеграции логических планов Apache Spark с метаданными в Apache Atlas , подобно тому как это происходит в данной статье с Apache NIFI .
Тестовая задача для иллюстрации и парсинг плана в AST:
Определим небольшой фалй cars.csv со соледующим содержанием:
model,manufacturer Model S,Tesla Model 3,Tesla Mustang,Ford Civic,Honda
И напишем даг выведем его логический план:
val spark = SparkSession.builder() .appName("Logical Plan Example") .master("local") .getOrCreate() import spark.implicits._ val carsCSV = spark .read .option("header", "true") .csv("src/main/resources/cars.csv") val carsSeq = List( ("i8", "BMW"), ("A4", "Audi"), ("911", "Porsche"), ("Corolla", "Toyota") ).toDF("model", "manufacturer") val unioncars = carsCSV.union(carsSeq) val resDF = unioncars .where(col("manufacturer") =!= "Audi") .select("model", "manufacturer") .withColumn("processedDDTM", lit(LocalDateTime.now())) val logicalPlan = resDF.queryExecution.logical println(logicalPlan) /* вывод Project [model#17, manufacturer#18, 2024-09-12 13:00:46.880141 AS processedDDTM#36] +- Project [model#17, manufacturer#18] +- Filter NOT (manufacturer#18 = Audi) +- Union false, false :- Relation [model#17,manufacturer#18] csv +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29] +- LocalRelation [_1#23, _2#24] */ }
Логический план представляет собой дерево, и для дальнейшей работы его необходимо преобразовать в удобную форму (AST).
Для этого мы определим класс AST, который будет отражать структуру плана в формате, удобном для последующей обработки.
// Определение корневого класса или типа для всех узлов дерева sealed trait Node { // Метод для получения имени узла на основе его типа def getName: String = this.getClass.toString } // Узел типа "Проект", содержащий последовательность столбцов case class ProjectNode(columns: Seq[String]) extends Node { // Переопределение метода getName для возврата конкретного имени узла override def getName: String = "Project" } // Узел типа "Фильтр", содержащий условие фильтрации case class FilterNode(condition: String) extends Node { // Переопределение метода getName для возврата конкретного имени узла override def getName: String = "Filter" } // Узел типа "Объединение", указывающий, следует ли объединять все записи и по какому признаку case class UnionNode(isAll: Boolean, byName: Boolean) extends Node { // Переопределение метода getName для возврата конкретного имени узла override def getName: String = "Union" } // Узел типа "Логическое отношение", содержащий последовательность столбцов case class LogicalRelationNode(columns: Seq[String]) extends Node { // Переопределение метода getName для возврата конкретного имени узла override def getName: String = "LogicalRelation" } case class LocalRelationNode(columns: Seq[String]) extends Node { override def getName: String = "LocalRelation" } // Узел типа "Локальное отношение", содержащий последовательность столбцов case class LocalRelationNode(columns: Seq[String]) extends Node { // Переопределение метода getName для возврата конкретного имени узла override def getName: String = "LocalRelation" } // Класс для представления абстрактного синтаксического дерева (AST), где каждый узел имеет тип Node, // список дочерних узлов, номер уровня и выражение уровня (необходим для индонтефикации нод на одном уровне) case class AST(node: Node, children: Seq[AST], level_num: Int, levelExpr: String)
И напишем парсер из логического плана в AST
// Объект для парсинга логических планов в AST object ParserAST { // Функция для преобразования логического плана в AST // Возвращает Option[AST], где None означает, что план не может быть преобразован private def parseAST(plan: LogicalPlan): Option[AST] = { // Рекурсивная функция для обхода логического плана и создания узлов AST // Параметры: // - logicalPlan: текущий логический план для обработки // - levelnum: уровень в дереве AST // - levelExpr: строковое представление уровня и индекса // Возвращает Option[AST], где None означает, что логический план не может быть преобразован def loop(logicalPlan: LogicalPlan, levelnum: Int, levelExpr: String): Option[AST] = { // Определение узла на основе типа логического плана val node: Option[Node] = logicalPlan match { case p: Project => // Обработка узла типа Project и создание узла AST с именем "Project" val columns = p.projectList.map(_.sql) Some(ProjectNode(columns)) case f: Filter => // Обработка узла типа Filter и создание узла AST с именем "Filter" val condition = f.condition.sql Some(FilterNode(condition)) case u: Union => // Обработка узла типа Union и создание узла AST с именем "Union" val isAll = u.allowMissingCol val byName = u.byName Some(UnionNode(isAll, byName)) case lr: LocalRelation => // Обработка узла типа LocalRelation и создание узла AST с именем "LocalRelation" val columns = lr.output.map(_.sql) Some(LocalRelationNode(columns)) case lr: LogicalRelation => // Обработка узла типа LogicalRelation и создание узла AST с именем "LogicalRelation" val columns = lr.output.map(_.sql) Some(LogicalRelationNode(columns)) case _ => // Если логический план не совпадает ни с одним из известных типов, возвращаем None None } // Если узел успешно создан, создаем AST и рекурсивно обрабатываем детей node.map { n => // Создание списка дочерних узлов AST, рекурсивно обрабатывая каждый дочерний план val children = logicalPlan.children.zipWithIndex.flatMap { case (ch, i) => loop(ch, levelnum + 1, f"${levelnum + 1}_${i}") }.toList // Создание узла AST с текущим узлом и его дочерними узлами AST(n, children, levelnum, levelExpr) } } // Запуск рекурсивного обхода с начальным уровнем и строковым представлением loop(plan, 1, "1_0") } // Неявное преобразование для класса LogicalPlan, добавляющее метод для получения AST implicit class parser(lp: LogicalPlan) { def AST(): Option[AST] = { parseAST(lp) } } }
теперь можно получать AST следующим образом logicalPlan.AST().get
определим сущености в Атласе для построения Lianage:
Подобно тому, как в языках программирования на базе Java все классы наследуются от Object, в Apache Atlas все сущности наследуются от Referenceable. Однако построение lineage (линейности данных) происходит только для типов Process и DataSet. Если тип не наследуется от одного из этих классов (например, если наследование происходит от Asset), то кнопка «Lineage» попросту не появится.
Кроме того, сам lineage строится на основе полей inputs и outputs для Process, аналогично и для DataSet. Здесь ничего не поделаешь — придется наследоваться от этих типов, хотя большинство полей будет оставаться пустыми.
Изначально моей целью было отразить преобразования, происходящие в Apache Spark, но структура Apache Atlas вынуждает окружать мои Process сущностями DataSet в полях inputs и outputs. Хотя меня изначально интересовали только Process, эти DataSet-ы могут быть использованы для отображения схем данных, с которыми процесс начинается и которые возвращает. Однако на данном этапе я не планирую парсить схемы и оставлю каждый DataSet пустым.
В Apache Atlas кастомные сущности можно описывать с помощью формата JSON. При этом важно соблюдать правильную последовательность определения типов, иначе возникнет ошибка 404 при попытке сослаться на тип, который еще не существует в системе.
Сначала определим тип для DataSet.
{ "enumDefs": [], "structDefs": [], "classificationDefs": [], "entityDefs": [ { "name": "pico_spark_data_type", "description": "A type inheriting from assets for Pico DataSet", "superTypes": ["DataSet"], "attributeDefs": [], "relationshipDefs": [] } ], "relationshipDefs": [], "businessMetadataDefs": [] }
Комментарии:
-
enumDefs,structDefs,classificationDefs:-
Пустые массивы, так как перечисления, структуры и классификации не используются.
-
-
entityDefs:-
Определяет сущности в системе.
-
name: Имя сущности, которая представляет тип данных. -
description: Описание сущности. -
superTypes: Суперклассы, от которых наследуется данная сущность. -
attributeDefs: Пустой массив, так как атрибуты не указаны. -
relationshipDefs: Пустой массив, так как связи не определены.
-
-
relationshipDefs,businessMetadataDefs:-
Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.
-
{ "enumDefs": [], "structDefs": [], "classificationDefs": [], "entityDefs": [ { "name": "pico_spark_process_type", "description": "A type inheriting from assets for Pico Spark abstraction", "superTypes": ["Process"], "attributeDefs": [ { "name": "inputs", "description": "List of inputs for the process", "typeName": "array<pico_spark_data_type>", "isOptional": true }, { "name": "outputs", "description": "List of outputs for the process", "typeName": "array<pico_spark_data_type>", "isOptional": true } ], "relationshipDefs": [] } ], "relationshipDefs": [], "businessMetadataDefs": [] }
Комментарии:
-
enumDefs,structDefs,classificationDefs:-
Пустые массивы, так как перечисления, структуры и классификации не используются в данном определении.
-
-
entityDefs:-
Содержит определения сущностей.
-
name: Имя сущности, определяющей тип данных в контексте Pico Spark. -
description: Описание сущности. -
superTypes: Суперклассы, от которых сущность наследуется. -
attributeDefs: Пустой массив, так как атрибуты не добавлены. -
relationshipDefs: Пустой массив, так как связи не указаны.
-
-
relationshipDefs,businessMetadataDefs:-
Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.
-
Для типа pico_spark_process_type я также создаю наследников для всех типов узлов (Filter, Project, Union и т.д.) в AST. Однако здесь я опущу это, поскольку это займет слишком много места и будет слишком однообразно.
В этих JSON-ах много пустых сущностей, но без них не обойтись, так как без них типы в Apache Atlas не создаются.
Взаимодействие с Apache Atlas по REST:
Простого описания сущностей недостаточно — их нужно передать в Apache Atlas. У Atlas есть обширное REST API для взаимодействия с системой. Конкретно процесс создания нового типа выглядит следующим образом:
curl -X POST "http://<atlas-server-url>/api/atlas/v2/types/typedefs" \ -H "Content-Type: application/json" \ -H "Accept: application/json" \ -d '{ "enumDefs": [], "structDefs": [], "classificationDefs": [], "entityDefs": [ { "name": "pico_spark_data_type", "description": "A type inheriting from assets for Pico DataSet", "superTypes": ["DataSet"], "attributeDefs": [], "relationshipDefs": [] } ], "relationshipDefs": [], "businessMetadataDefs": [] }'
создаю JSON файл где будут перечислены тела запросов для всх необходимых кастомных типов под названием EntityTypes.json
и создам метод который читает этот файл и делает запрос на каждый EntityType
val atlasServerUrl = "http://localhost:21000/api/atlas/v2" val authHeader: String = "Basic " + java.util.Base64.getEncoder.encodeToString("admin:admin".getBytes) def generatePicoSparkTypes(): Unit = { // Функция для чтения содержимого файла из ресурсов def readFileFromResources(fileName: String): String = { val source = Source.fromResource(fileName) try source.mkString finally source.close() } // Чтение JSON из файла ресурсов val jsonString = readFileFromResources("EntityTypes.json") // Попытка разобрать строку JSON в структуру данных val parsedJson: Either[ParsingFailure, Json] = parse(jsonString) // Преобразование разобранного JSON в список объектов JSON val jsonObjects: Option[List[Json]] = parsedJson match { case Right(json) => json.as[List[Json]] match { case Right(jsonArray) => Some(jsonArray) case Left(error) => // Обработка ошибки разбора массива JSON println(s"Error parsing JSON array: $error") None } case Left(error) => // Обработка ошибки разбора JSON println(s"Error parsing JSON: $error") None } // Отправка каждого объекта JSON на сервер Atlas jsonObjects match { case Some(jsonArray) => jsonArray.foreach { jsonBody => // Создание POST-запроса для создания типа в Apache Atlas val createTypeRequest = basicRequest .method(Method.POST, uri"$atlasServerUrl/types/typedefs") // Метод POST и URL для запроса .header("Authorization", authHeader) // Заголовок авторизации .header("Content-Type", "application/json") // Заголовок типа содержимого .header("Accept", "application/json") // Заголовок для принятия ответа в формате JSON .body(jsonBody.noSpaces) // Тело запроса с JSON-данными .response(asString) // Ожидание ответа в формате строки // Отправка запроса и вывод результата val response = createTypeRequest.send(backend) println(response.body) // Печать тела ответа println(response.code) // Печать кода ответа } case None => // Сообщение, если JSON-объекты не были найдены println("No JSON objects found.") } }
коментарии:
-
readFileFromResources: Функция для чтения содержимого файла JSON из ресурсов. -
jsonString: Получение строки JSON из файла. -
parsedJson: Попытка разобрать строку JSON в структуру данныхJson. -
jsonObjects: Преобразование разобранного JSON в список объектов JSON. -
jsonArray.foreach: Для каждого объекта JSON создается и отправляется POST-запрос на сервер Atlas. -
createTypeRequest: Создание POST-запроса с JSON-данными для создания типов в Apache Atlas. -
response: Отправка запроса и вывод результата, включая тело ответа и код ответа.
теперь для создания всех энтити в Apache Atlas достаточно вызвать метод generatePicoSparkTypes()
Поскольку DataSet сущности уже созданы, можно сразу приступить к созданию Process сущностей с заполненными полями inputs и outputs. Это важно, так как при попытках обновления сущностей через API ничего не сработало. Начнем с определения набора методов:
как видим все EntityType созданы
Создаем DataSet Entity:
перед тем как создовать сущьгъности процкссов нужно создать сущьности DataSet-тов, поскольк первые ссылаються на вторые
на данном уже определен pico_spark_data_type который отвечает за входные / выходные схемы данных
для начала определимся сдвумя вспомогательными мктодами
/** * Создает функцию для отправки JSON данных на указанный эндпоинт в Apache Atlas. * * @param postfix Строка, добавляемая к базовому URL для формирования полного URL эндпоинта. * @return Функция, принимающая JSON строку и отправляющая ее на сервер через HTTP POST запрос. */ def senderJsonToAtlasEndpoint(postfix: String): String => Unit = { jsonBody => { // Создание HTTP POST запроса для отправки JSON данных на сервер val createTypeRequest = basicRequest .method(Method.POST, uri"$atlasServerUrl/${postfix}") .header("Authorization", authHeader) .header("Content-Type", "application/json") .header("Accept", "application/json") .body(jsonBody) .response(asString) // Отправка запроса и получение ответа val response = createTypeRequest.send(backend) // Вывод тела ответа и кода статуса println(response.body) println(response.code) } } /** * Генерирует и отправляет сущности данных Spark в Apache Atlas для указанного домена. * * @param domain Домен, который будет использоваться в атрибутах сущностей. * @param execJsonAtlas Функция для отправки JSON данных в Apache Atlas. * @return Функция, принимающая AST и создающая JSON для каждой дочерней сущности. */ def generateSparkDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = { // Локальная функция для генерации и отправки сущностей данных Spark def generateEntities(ast: AST): Unit = { ast.children.foreach { inast => // Формирование JSON тела для сущности данных Spark val jsonBody = f""" |{ | "entity": { | "typeName": "pico_spark_data_type", | "attributes": { | "domain": "${domain}", | "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}", | "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}", | "description": "A description for the spark_data" | } | } |} |""".stripMargin // Отправка сформированного JSON тела на сервер execJsonAtlas(jsonBody) // Рекурсивный вызов для обработки дочерних узлов generateEntities(inast) } } // Возвращаем функцию для генерации сущностей generateEntities }
Пояснения:
-
senderJsonToAtlasEndpoint: Эта функция создает и возвращает другую функцию, которая отправляет JSON данные на указанный эндпоинт в Apache Atlas. Комментарии объясняют параметры, создание запроса, отправку и обработку ответа. -
generateSparkDataEntities: Эта функция генерирует сущности данных Spark, формирует соответствующий JSON и отправляет его в Apache Atlas, используя переданную функцию для отправки. Комментарии описывают параметры и внутреннюю логику функции, включая рекурсивный вызов для обработки всех дочерних узлов.
Напишкм еще 2 метода для запуска формирования Linage В Atlas
/** * Преобразует AST (абстрактное синтаксическое дерево) в сущности Apache Atlas и отправляет их на сервер. * * @param ast Абстрактное синтаксическое дерево, представляющее структуру данных. * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей. * @param topLevelExpr Выражение уровня, используемое для определения уровня в AST. В данном случае не используется. */ def ASTToAtlasEntity(ast: AST, domain: String, topLevelExpr: String): Unit = { // Создание функции для отправки JSON данных на эндпоинт "entity" в Apache Atlas val entitySender = senderJsonToAtlasEndpoint("entity") // Создание функции для генерации сущностей данных Spark и отправки их в Apache Atlas val sparkDataEntityGenerator = generateSparkDataEntities(domain, entitySender) // Создание базовых сущностей вывода и отправка их на сервер //ее реализацию опущу createBaseOutput(domain, entitySender) // Создание базовых сущностей ввода и отправка их на сервер //ее реализацию опущу createBaseInput(domain, entitySender) // Генерация и отправка сущностей данных Spark на основе AST sparkDataEntityGenerator(ast) } /** * Имплементация расширения для преобразования AST в сущности Apache Atlas. * * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей. */ implicit class converter(ast: AST) { /** * Преобразует текущее AST в сущности Apache Atlas и отправляет их на сервер. * * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей. */ def EntityToAtlas(domain: String): Unit = { ASTToAtlasEntity(ast, domain, "") } }
Пояснения:
-
ASTToAtlasEntity: Этот метод преобразует переданное AST в сущности Apache Atlas и отправляет их на сервер. Он использует вспомогательные функции для создания базовых сущностей и генерации сущностей данных Spark, а также отправляет их на сервер через созданную функциюentitySender. -
EntityToAtlas: Это метод расширения (implicit class) для типаAST, который упрощает вызов методаASTToAtlasEntityс дефолтным значением дляtopLevelExpr. Этот метод предоставляет удобный способ преобразования AST в сущности Apache Atlas, используя указанный домен.
Теперь при запуске ast.EntityToAtlas("picoDomain")В атласе появляеться data entity
так как DataSet Entity уже созданы, то можно создовать Process Entity сразу с заролнеными inputs и outputs, это важно поскольку сколько я не тыкалась в Api для обновления Entuty ничкго не работало.
начнем с того что определим пачку методов:
// Создает функцию для отправки сущностей в Apache Atlas // Использует функцию преобразования AST в JSON и функцию отправки JSON def senderEntity(nodeToAtlasCreateEntityJson: (AST, String) => String, execJsonAtlas: String => Unit): (AST, String) => Unit = { // Возвращает функцию, которая преобразует AST в JSON и отправляет его в Atlas (ast: AST, topLevelExpr: String) => { val jsonBody = nodeToAtlasCreateEntityJson(ast, topLevelExpr) execJsonAtlas(jsonBody) } } // Генерирует JSON для сущностей в Atlas на основе AST и уровня // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д. def generatotrProcessEntity(domain: String, qualifiedName: (Node, String) => String): (AST, String) => String = { (ast: AST, topLevelExpr: String) => { val node = ast.node // Создает список входных сущностей, если есть дочерние элементы val inputs = if (ast.children.nonEmpty) { ast.children.map(_.levelExpr).map { expr => f""" | |{ | "typeName": "pico_spark_data_type", | "uniqueAttributes": { | "qualifiedName": "pico_spark_data_${ast.levelExpr}-${expr}@${domain}" | } |} | |""".stripMargin }.mkString(", ") } else { f""" | { | "typeName": "pico_spark_data_type", | "uniqueAttributes": { | "qualifiedName": "pico_spark_data_input@${domain}" | } | } |""".stripMargin } // Создает JSON для выходных сущностей, если задан topLevelExpr val output = if (topLevelExpr.nonEmpty) { f""" | { | "typeName": "pico_spark_data_type", | "uniqueAttributes": { | "qualifiedName": "pico_spark_data_${topLevelExpr}-${ast.levelExpr}@${domain}" | } | } |""".stripMargin } else { f""" | { | "typeName": "pico_spark_data_type", | "uniqueAttributes": { | "qualifiedName": "pico_spark_data_output@${domain}" | } | } |""".stripMargin } // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д. node match { case p: ProjectNode => f""" |{ |"entity": { | "typeName": "pico_spark_project_type", | "attributes": { | "qualifiedName": "${qualifiedName(node, ast.levelExpr)}", | "name": "pico_project_${ast.levelExpr}", | "description": "This is an project for the pico_spark_project_type", | "columns": [${p.columns.map(col => "\"" + col + "\"").mkString(", ")}], | "inputs":[ ${inputs} ], | "outputs":[ ${output} ] | } | } |} |""".stripMargin case ... } } } // Создает функцию для генерации и отправки сущностей в Apache Atlas // Использует предоставленные функции для создания JSON и отправки его в Atlas def generatorDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = { def sparkDataEntitys(ast: AST): Unit = { ast.children.foreach { inast => val jsonBody = f""" |{ | "entity": { | "typeName": "pico_spark_data_type", | "attributes": { | "domain": "${domain}", | "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}", | "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}", | "description": "A description for the spark_data" | } | } |} |""".stripMargin execJsonAtlas(jsonBody) sparkDataEntitys(inast) } } // Возвращает функцию, которая генерирует и отправляет сущности данных для Spark sparkDataEntitys }
Пояснения:
-
senderEntity: Функция, которая создает и отправляет JSON для сущностей в Apache Atlas, используя предоставленные функции преобразования и отправки. -
generatotrProcessEntity: Функция, которая генерирует JSON для различных типов узлов в AST и преобразует их в формат, пригодный для Apache Atlas. -
generatorDataEntities: Функция, которая создает и отправляет данные сущностей для Spark, рекурсивно обрабатывая детей узлов в AST.
И обновляем методы для работы с AST
// Преобразует AST в сущности Apache Atlas и отправляет их на указанный эндпоинт def ASTToAtlasEntity(ast: AST, domain: String): Unit = { // Создает функцию отправки JSON-данных для сущностей в Apache Atlas val entitySender = senderJsonToAtlasEndpoint("entity") // Создает функцию для генерации квалифицированного имени val qualifiedName = generatorQualifiedName(domain) // Создает функцию для генерации JSON-сущностей для процессов val generatorProcessEntity = generatotrProcessEntity(domain, qualifiedName) // Создает функцию для отправки JSON-данных сущностей в Atlas val sendEntity = senderEntity(generatorProcessEntity, entitySender) // Создает функцию для генерации данных сущностей и отправки их в Atlas val generateDataEntity = generatorDataEntities(domain, entitySender) // Обрабатывает один узел AST, отправляя его как сущность в Atlas def processNode(ast: AST, intopLevelExpr: String): Unit = { sendEntity(ast, intopLevelExpr) } // Рекурсивно проходит по всему дереву AST, обрабатывая каждый узел def traverseAST(ast: AST, intopLevelExpr: String): Unit = { processNode(ast, intopLevelExpr) ast.children.foreach(ch => traverseAST(ch, ast.levelExpr)) } // Создает базовые выходные и входные сущности для указанного домена и отправляет их в Atlas createBaseOutput(domain, entitySender) createBaseInput(domain, entitySender) // Генерирует данные сущностей для AST и отправляет их в Atlas generateDataEntity(ast) // Запускает рекурсивное прохождение AST traverseAST(ast, "") } // Обогащает класс AST функцией для преобразования его в сущности Apache Atlas implicit class converter(ast: AST) { // Преобразует текущий узел AST в сущности Apache Atlas и отправляет их на указанный эндпоинт def EntityToAtlas(domain: String): Unit = { ASTToAtlasEntity(ast, domain) } }
Пояснения:
-
ASTToAtlasEntity: Основной метод, который:-
Создает функции для преобразования AST в JSON и отправки его в Apache Atlas.
-
Определяет вспомогательные функции для обработки узлов AST и рекурсивного обхода дерева.
-
Создает и отправляет базовые сущности (входные и выходные) в Atlas.
-
Рекурсивно проходит по дереву AST и отправляет каждую сущность в Atlas.
-
-
implicit class converter(ast: AST): Обогащает классAST, добавляя метод для преобразования AST в сущности Apache Atlas.-
EntityToAtlas: Использует методASTToAtlasEntityдля преобразования текущего узлаASTв сущности Atlas и отправки их в указанный домен.
-
Теперь после запуска В Apache Atlas таки появиться Linage

Чтож, на изначальный logical план вроде похоже
Project [model#17, manufacturer#18, 2024-09-12 16:57:34.046609 AS processedDDTM#36] +- Project [model#17, manufacturer#18] +- Filter NOT (manufacturer#18 = Audi) +- Union false, false :- Relation [model#17,manufacturer#18] csv +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29] +- LocalRelation [_1#23, _2#24]
P.S. код можно глянуть тут
P.P.S докер фалы для запуска Apache Atlas можно взять тут
ссылка на оригинал статьи https://habr.com/ru/articles/842718/
Добавить комментарий