Создание data lineage в Apache Atlas из логических планов Spark (не без «костылей»)

от автора

Ремарка:

Текущая реализация представляет собой сырой прототип, направленный исключительно на демонстрацию возможности отображения логического плана из Apache Spark в Apache Atlas. Lfyysq прототип, по сути, является «прототипом прототипа» и служит лишь начальной отправной точкой для более глубокого анализа и разработки.

В данной работе Автор не стремимся представить окончательное или оптимальное решение. основной фокус заключается в демонстрации принципа и наметке необходимых методов для интеграции логических планов с метаданными в Apache Atlas.

Автор не призываем использовать данный подход в производственной среде в его текущем виде. Для полноценного решения задачи требуется дальнейшая проработка, включая создание специализированных библиотек, улучшение архитектуры. И все прочие прочие …

Цель Работы:

Целью данной работы является создание прототипа, демонстрирующего возможность интеграции логических планов Apache Spark с метаданными в Apache Atlas , подобно тому как это происходит в данной статье с Apache NIFI .

Тестовая задача для иллюстрации и парсинг плана в AST:

Определим небольшой фалй cars.csv со соледующим содержанием:

model,manufacturer Model S,Tesla Model 3,Tesla Mustang,Ford Civic,Honda

И напишем даг выведем его логический план:

  val spark = SparkSession.builder()     .appName("Logical Plan Example")     .master("local")     .getOrCreate()    import spark.implicits._    val carsCSV = spark     .read     .option("header", "true")     .csv("src/main/resources/cars.csv")    val carsSeq = List(     ("i8", "BMW"),     ("A4", "Audi"),     ("911", "Porsche"),     ("Corolla", "Toyota")   ).toDF("model", "manufacturer")    val unioncars = carsCSV.union(carsSeq)    val resDF = unioncars     .where(col("manufacturer") =!= "Audi")     .select("model", "manufacturer")     .withColumn("processedDDTM", lit(LocalDateTime.now()))    val logicalPlan = resDF.queryExecution.logical    println(logicalPlan) /* вывод     Project [model#17, manufacturer#18, 2024-09-12 13:00:46.880141 AS processedDDTM#36]       +- Project [model#17, manufacturer#18]          +- Filter NOT (manufacturer#18 = Audi)             +- Union false, false                :- Relation [model#17,manufacturer#18] csv                +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]                   +- LocalRelation [_1#23, _2#24]    */ }  

Логический план представляет собой дерево, и для дальнейшей работы его необходимо преобразовать в удобную форму (AST).

Для этого мы определим класс AST, который будет отражать структуру плана в формате, удобном для последующей обработки.

// Определение корневого класса или типа для всех узлов дерева sealed trait Node {   // Метод для получения имени узла на основе его типа   def getName: String = this.getClass.toString }  // Узел типа "Проект", содержащий последовательность столбцов case class ProjectNode(columns: Seq[String]) extends Node {   // Переопределение метода getName для возврата конкретного имени узла   override def getName: String = "Project" }  // Узел типа "Фильтр", содержащий условие фильтрации case class FilterNode(condition: String) extends Node {   // Переопределение метода getName для возврата конкретного имени узла   override def getName: String = "Filter" }  // Узел типа "Объединение", указывающий, следует ли объединять все записи и по какому признаку case class UnionNode(isAll: Boolean, byName: Boolean) extends Node {   // Переопределение метода getName для возврата конкретного имени узла   override def getName: String = "Union" }  // Узел типа "Логическое отношение", содержащий последовательность столбцов case class LogicalRelationNode(columns: Seq[String]) extends Node {   // Переопределение метода getName для возврата конкретного имени узла   override def getName: String = "LogicalRelation" }  case class LocalRelationNode(columns: Seq[String]) extends Node {   override def getName: String = "LocalRelation" }  // Узел типа "Локальное отношение", содержащий последовательность столбцов case class LocalRelationNode(columns: Seq[String]) extends Node {   // Переопределение метода getName для возврата конкретного имени узла   override def getName: String = "LocalRelation" }  // Класс для представления абстрактного синтаксического дерева (AST), где каждый узел имеет тип Node, // список дочерних узлов, номер уровня и выражение уровня (необходим для индонтефикации нод на одном уровне) case class AST(node: Node,                children: Seq[AST],                level_num: Int,                levelExpr: String)

И напишем парсер из логического плана в AST

// Объект для парсинга логических планов в AST object ParserAST {    // Функция для преобразования логического плана в AST   // Возвращает Option[AST], где None означает, что план не может быть преобразован   private def parseAST(plan: LogicalPlan): Option[AST] = {      // Рекурсивная функция для обхода логического плана и создания узлов AST     // Параметры:     // - logicalPlan: текущий логический план для обработки     // - levelnum: уровень в дереве AST     // - levelExpr: строковое представление уровня и индекса     // Возвращает Option[AST], где None означает, что логический план не может быть преобразован     def loop(logicalPlan: LogicalPlan, levelnum: Int, levelExpr: String): Option[AST] = {        // Определение узла на основе типа логического плана       val node: Option[Node] = logicalPlan match {         case p: Project =>           // Обработка узла типа Project и создание узла AST с именем "Project"           val columns = p.projectList.map(_.sql)           Some(ProjectNode(columns))                    case f: Filter =>           // Обработка узла типа Filter и создание узла AST с именем "Filter"           val condition = f.condition.sql           Some(FilterNode(condition))                    case u: Union =>           // Обработка узла типа Union и создание узла AST с именем "Union"           val isAll = u.allowMissingCol           val byName = u.byName           Some(UnionNode(isAll, byName))                    case lr: LocalRelation =>           // Обработка узла типа LocalRelation и создание узла AST с именем "LocalRelation"           val columns = lr.output.map(_.sql)           Some(LocalRelationNode(columns))                    case lr: LogicalRelation =>           // Обработка узла типа LogicalRelation и создание узла AST с именем "LogicalRelation"           val columns = lr.output.map(_.sql)           Some(LogicalRelationNode(columns))                    case _ =>           // Если логический план не совпадает ни с одним из известных типов, возвращаем None           None       }        // Если узел успешно создан, создаем AST и рекурсивно обрабатываем детей       node.map { n =>         // Создание списка дочерних узлов AST, рекурсивно обрабатывая каждый дочерний план         val children = logicalPlan.children.zipWithIndex.flatMap {           case (ch, i) => loop(ch, levelnum + 1, f"${levelnum + 1}_${i}")         }.toList         // Создание узла AST с текущим узлом и его дочерними узлами         AST(n, children, levelnum, levelExpr)       }     }      // Запуск рекурсивного обхода с начальным уровнем и строковым представлением     loop(plan, 1, "1_0")   }    // Неявное преобразование для класса LogicalPlan, добавляющее метод для получения AST   implicit class parser(lp: LogicalPlan) {     def AST(): Option[AST] = {       parseAST(lp)     }   } } 

теперь можно получать AST следующим образом logicalPlan.AST().get

определим сущености в Атласе для построения Lianage:

таблица наследовательности в Apche Atlas

таблица наследовательности в Apche Atlas

Подобно тому, как в языках программирования на базе Java все классы наследуются от Object, в Apache Atlas все сущности наследуются от Referenceable. Однако построение lineage (линейности данных) происходит только для типов Process и DataSet. Если тип не наследуется от одного из этих классов (например, если наследование происходит от Asset), то кнопка «Lineage» попросту не появится.

Кроме того, сам lineage строится на основе полей inputs и outputs для Process, аналогично и для DataSet. Здесь ничего не поделаешь — придется наследоваться от этих типов, хотя большинство полей будет оставаться пустыми.

Изначально моей целью было отразить преобразования, происходящие в Apache Spark, но структура Apache Atlas вынуждает окружать мои Process сущностями DataSet в полях inputs и outputs. Хотя меня изначально интересовали только Process, эти DataSet-ы могут быть использованы для отображения схем данных, с которыми процесс начинается и которые возвращает. Однако на данном этапе я не планирую парсить схемы и оставлю каждый DataSet пустым.

В Apache Atlas кастомные сущности можно описывать с помощью формата JSON. При этом важно соблюдать правильную последовательность определения типов, иначе возникнет ошибка 404 при попытке сослаться на тип, который еще не существует в системе.

Сначала определим тип для DataSet.

  {     "enumDefs": [],     "structDefs": [],     "classificationDefs": [],     "entityDefs": [       {         "name": "pico_spark_data_type",          "description": "A type inheriting from assets for Pico DataSet",          "superTypes": ["DataSet"],         "attributeDefs": [],         "relationshipDefs": []        }     ],     "relationshipDefs": [],     "businessMetadataDefs": []   }

Комментарии:

  1. enumDefs, structDefs, classificationDefs:

    • Пустые массивы, так как перечисления, структуры и классификации не используются.

  2. entityDefs:

    • Определяет сущности в системе.

    • name: Имя сущности, которая представляет тип данных.

    • description: Описание сущности.

    • superTypes: Суперклассы, от которых наследуется данная сущность.

    • attributeDefs: Пустой массив, так как атрибуты не указаны.

    • relationshipDefs: Пустой массив, так как связи не определены.

  3. relationshipDefs, businessMetadataDefs:

    • Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.

{     "enumDefs": [],     "structDefs": [],     "classificationDefs": [],     "entityDefs": [       {         "name": "pico_spark_process_type",         "description": "A type inheriting from assets for Pico Spark abstraction",         "superTypes": ["Process"],         "attributeDefs": [           {             "name": "inputs",             "description": "List of inputs for the process",             "typeName": "array<pico_spark_data_type>",             "isOptional": true           },           {             "name": "outputs",             "description": "List of outputs for the process",             "typeName": "array<pico_spark_data_type>",             "isOptional": true           }         ],         "relationshipDefs": []       }     ],     "relationshipDefs": [],     "businessMetadataDefs": []   }

Комментарии:

  1. enumDefs, structDefs, classificationDefs:

    • Пустые массивы, так как перечисления, структуры и классификации не используются в данном определении.

  2. entityDefs:

    • Содержит определения сущностей.

    • name: Имя сущности, определяющей тип данных в контексте Pico Spark.

    • description: Описание сущности.

    • superTypes: Суперклассы, от которых сущность наследуется.

    • attributeDefs: Пустой массив, так как атрибуты не добавлены.

    • relationshipDefs: Пустой массив, так как связи не указаны.

  3. relationshipDefs, businessMetadataDefs:

    • Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.

Для типа pico_spark_process_type я также создаю наследников для всех типов узлов (Filter, Project, Union и т.д.) в AST. Однако здесь я опущу это, поскольку это займет слишком много места и будет слишком однообразно.

В этих JSON-ах много пустых сущностей, но без них не обойтись, так как без них типы в Apache Atlas не создаются.

Взаимодействие с Apache Atlas по REST:

Простого описания сущностей недостаточно — их нужно передать в Apache Atlas. У Atlas есть обширное REST API для взаимодействия с системой. Конкретно процесс создания нового типа выглядит следующим образом:

curl -X POST "http://<atlas-server-url>/api/atlas/v2/types/typedefs" \      -H "Content-Type: application/json" \      -H "Accept: application/json" \      -d '{            "enumDefs": [],            "structDefs": [],            "classificationDefs": [],            "entityDefs": [              {                "name": "pico_spark_data_type",                "description": "A type inheriting from assets for Pico DataSet",                "superTypes": ["DataSet"],                "attributeDefs": [],                "relationshipDefs": []              }            ],            "relationshipDefs": [],            "businessMetadataDefs": []          }' 

создаю JSON файл где будут перечислены тела запросов для всх необходимых кастомных типов под названием EntityTypes.json
и создам метод который читает этот файл и делает запрос на каждый EntityType

  val atlasServerUrl = "http://localhost:21000/api/atlas/v2"   val authHeader: String = "Basic " + java.util.Base64.getEncoder.encodeToString("admin:admin".getBytes)  def generatePicoSparkTypes(): Unit = {    // Функция для чтения содержимого файла из ресурсов   def readFileFromResources(fileName: String): String = {     val source = Source.fromResource(fileName)     try source.mkString     finally source.close()   }    // Чтение JSON из файла ресурсов   val jsonString = readFileFromResources("EntityTypes.json")    // Попытка разобрать строку JSON в структуру данных   val parsedJson: Either[ParsingFailure, Json] = parse(jsonString)    // Преобразование разобранного JSON в список объектов JSON   val jsonObjects: Option[List[Json]] = parsedJson match {     case Right(json) =>       json.as[List[Json]] match {         case Right(jsonArray) =>           Some(jsonArray)         case Left(error) =>           // Обработка ошибки разбора массива JSON           println(s"Error parsing JSON array: $error")           None       }     case Left(error) =>       // Обработка ошибки разбора JSON       println(s"Error parsing JSON: $error")       None   }    // Отправка каждого объекта JSON на сервер Atlas   jsonObjects match {     case Some(jsonArray) =>       jsonArray.foreach { jsonBody =>         // Создание POST-запроса для создания типа в Apache Atlas         val createTypeRequest = basicRequest           .method(Method.POST, uri"$atlasServerUrl/types/typedefs") // Метод POST и URL для запроса           .header("Authorization", authHeader) // Заголовок авторизации           .header("Content-Type", "application/json") // Заголовок типа содержимого           .header("Accept", "application/json") // Заголовок для принятия ответа в формате JSON           .body(jsonBody.noSpaces) // Тело запроса с JSON-данными           .response(asString) // Ожидание ответа в формате строки          // Отправка запроса и вывод результата         val response = createTypeRequest.send(backend)         println(response.body) // Печать тела ответа         println(response.code) // Печать кода ответа       }     case None =>       // Сообщение, если JSON-объекты не были найдены       println("No JSON objects found.")   }  } 

коментарии:

  1. readFileFromResources: Функция для чтения содержимого файла JSON из ресурсов.

  2. jsonString: Получение строки JSON из файла.

  3. parsedJson: Попытка разобрать строку JSON в структуру данных Json.

  4. jsonObjects: Преобразование разобранного JSON в список объектов JSON.

  5. jsonArray.foreach: Для каждого объекта JSON создается и отправляется POST-запрос на сервер Atlas.

  6. createTypeRequest: Создание POST-запроса с JSON-данными для создания типов в Apache Atlas.

  7. response: Отправка запроса и вывод результата, включая тело ответа и код ответа.

теперь для создания всех энтити в Apache Atlas достаточно вызвать метод
generatePicoSparkTypes()

Поскольку DataSet сущности уже созданы, можно сразу приступить к созданию Process сущностей с заполненными полями inputs и outputs. Это важно, так как при попытках обновления сущностей через API ничего не сработало. Начнем с определения набора методов:

EntityTypes в Apache Atlas

EntityTypes в Apache Atlas

как видим все EntityType созданы

Создаем DataSet Entity:

перед тем как создовать сущьгъности процкссов нужно создать сущьности DataSet-тов, поскольк первые ссылаються на вторые

на данном уже определен pico_spark_data_type который отвечает за входные / выходные схемы данных
для начала определимся сдвумя вспомогательными мктодами

/**  * Создает функцию для отправки JSON данных на указанный эндпоинт в Apache Atlas.  *  * @param postfix Строка, добавляемая к базовому URL для формирования полного URL эндпоинта.  * @return Функция, принимающая JSON строку и отправляющая ее на сервер через HTTP POST запрос.  */ def senderJsonToAtlasEndpoint(postfix: String): String => Unit = {    jsonBody => {     // Создание HTTP POST запроса для отправки JSON данных на сервер     val createTypeRequest = basicRequest       .method(Method.POST, uri"$atlasServerUrl/${postfix}")       .header("Authorization", authHeader)       .header("Content-Type", "application/json")       .header("Accept", "application/json")       .body(jsonBody)       .response(asString)      // Отправка запроса и получение ответа     val response = createTypeRequest.send(backend)          // Вывод тела ответа и кода статуса     println(response.body)     println(response.code)   } }  /**  * Генерирует и отправляет сущности данных Spark в Apache Atlas для указанного домена.  *  * @param domain Домен, который будет использоваться в атрибутах сущностей.  * @param execJsonAtlas Функция для отправки JSON данных в Apache Atlas.  * @return Функция, принимающая AST и создающая JSON для каждой дочерней сущности.  */ def generateSparkDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {    // Локальная функция для генерации и отправки сущностей данных Spark   def generateEntities(ast: AST): Unit = {     ast.children.foreach { inast =>       // Формирование JSON тела для сущности данных Spark       val jsonBody =         f"""            |{            |  "entity": {            |    "typeName": "pico_spark_data_type",            |    "attributes": {            |      "domain": "${domain}",            |      "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",            |      "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",            |      "description": "A description for the spark_data"            |    }            |  }            |}            |""".stripMargin        // Отправка сформированного JSON тела на сервер       execJsonAtlas(jsonBody)              // Рекурсивный вызов для обработки дочерних узлов       generateEntities(inast)     }   }    // Возвращаем функцию для генерации сущностей   generateEntities } 

Пояснения:

  • senderJsonToAtlasEndpoint: Эта функция создает и возвращает другую функцию, которая отправляет JSON данные на указанный эндпоинт в Apache Atlas. Комментарии объясняют параметры, создание запроса, отправку и обработку ответа.

  • generateSparkDataEntities: Эта функция генерирует сущности данных Spark, формирует соответствующий JSON и отправляет его в Apache Atlas, используя переданную функцию для отправки. Комментарии описывают параметры и внутреннюю логику функции, включая рекурсивный вызов для обработки всех дочерних узлов.

Напишкм еще 2 метода для запуска формирования Linage В Atlas

/**  * Преобразует AST (абстрактное синтаксическое дерево) в сущности Apache Atlas и отправляет их на сервер.  *  * @param ast Абстрактное синтаксическое дерево, представляющее структуру данных.  * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.  * @param topLevelExpr Выражение уровня, используемое для определения уровня в AST. В данном случае не используется.  */ def ASTToAtlasEntity(ast: AST, domain: String, topLevelExpr: String): Unit = {    // Создание функции для отправки JSON данных на эндпоинт "entity" в Apache Atlas   val entitySender = senderJsonToAtlasEndpoint("entity")      // Создание функции для генерации сущностей данных Spark и отправки их в Apache Atlas   val sparkDataEntityGenerator = generateSparkDataEntities(domain, entitySender)    // Создание базовых сущностей вывода и отправка их на сервер   //ее реализацию опущу   createBaseOutput(domain, entitySender)      // Создание базовых сущностей ввода и отправка их на сервер   //ее реализацию опущу   createBaseInput(domain, entitySender)      // Генерация и отправка сущностей данных Spark на основе AST   sparkDataEntityGenerator(ast) }  /**  * Имплементация расширения для преобразования AST в сущности Apache Atlas.  *  * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.  */ implicit class converter(ast: AST) {    /**    * Преобразует текущее AST в сущности Apache Atlas и отправляет их на сервер.    *    * @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.    */   def EntityToAtlas(domain: String): Unit = {     ASTToAtlasEntity(ast, domain, "")   } } 

Пояснения:

  • ASTToAtlasEntity: Этот метод преобразует переданное AST в сущности Apache Atlas и отправляет их на сервер. Он использует вспомогательные функции для создания базовых сущностей и генерации сущностей данных Spark, а также отправляет их на сервер через созданную функцию entitySender.

  • EntityToAtlas: Это метод расширения (implicit class) для типа AST, который упрощает вызов метода ASTToAtlasEntity с дефолтным значением для topLevelExpr. Этот метод предоставляет удобный способ преобразования AST в сущности Apache Atlas, используя указанный домен.

Теперь при запуске ast.EntityToAtlas("picoDomain")В атласе появляеться data entity

скриншот с web UI

скриншот с web UI

так как DataSet Entity уже созданы, то можно создовать Process Entity сразу с заролнеными inputs и outputs, это важно поскольку сколько я не тыкалась в Api для обновления Entuty ничкго не работало.

начнем с того что определим пачку методов:

  // Создает функцию для отправки сущностей в Apache Atlas   // Использует функцию преобразования AST в JSON и функцию отправки JSON   def senderEntity(nodeToAtlasCreateEntityJson: (AST, String) => String, execJsonAtlas: String => Unit): (AST, String) => Unit = {     // Возвращает функцию, которая преобразует AST в JSON и отправляет его в Atlas     (ast: AST, topLevelExpr: String) => {       val jsonBody = nodeToAtlasCreateEntityJson(ast, topLevelExpr)       execJsonAtlas(jsonBody)     }   }    // Генерирует JSON для сущностей в Atlas на основе AST и уровня   // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.   def generatotrProcessEntity(domain: String, qualifiedName: (Node, String) => String): (AST, String) => String = {     (ast: AST, topLevelExpr: String) => {       val node = ast.node        // Создает список входных сущностей, если есть дочерние элементы       val inputs = if (ast.children.nonEmpty) {         ast.children.map(_.levelExpr).map { expr =>           f"""              |              |{              |  "typeName": "pico_spark_data_type",              |  "uniqueAttributes": {              |    "qualifiedName": "pico_spark_data_${ast.levelExpr}-${expr}@${domain}"              |  }              |}              |              |""".stripMargin         }.mkString(", ")       } else {         f"""            | {            |  "typeName": "pico_spark_data_type",            |   "uniqueAttributes": {            |    "qualifiedName": "pico_spark_data_input@${domain}"            |   }            | }            |""".stripMargin       }        // Создает JSON для выходных сущностей, если задан topLevelExpr       val output = if (topLevelExpr.nonEmpty) {         f"""            | {            |  "typeName": "pico_spark_data_type",            |   "uniqueAttributes": {            |      "qualifiedName": "pico_spark_data_${topLevelExpr}-${ast.levelExpr}@${domain}"            |   }            | }            |""".stripMargin       } else {         f"""            | {            |  "typeName": "pico_spark_data_type",            |   "uniqueAttributes": {            |    "qualifiedName": "pico_spark_data_output@${domain}"            |   }            | }            |""".stripMargin       }        // Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.       node match {         case p: ProjectNode =>           f"""              |{              |"entity": {              |      "typeName": "pico_spark_project_type",              |      "attributes": {              |        "qualifiedName": "${qualifiedName(node, ast.levelExpr)}",              |        "name": "pico_project_${ast.levelExpr}",              |        "description": "This is an project for the pico_spark_project_type",              |        "columns": [${p.columns.map(col => "\"" + col + "\"").mkString(", ")}],              |        "inputs":[ ${inputs} ],              |        "outputs":[ ${output} ]              |      }              |    }              |}              |""".stripMargin         case ...        }     }   }    // Создает функцию для генерации и отправки сущностей в Apache Atlas   // Использует предоставленные функции для создания JSON и отправки его в Atlas   def generatorDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {      def sparkDataEntitys(ast: AST): Unit = {       ast.children.foreach { inast =>         val jsonBody =           f"""              |{              |  "entity": {              |    "typeName": "pico_spark_data_type",              |    "attributes": {              |      "domain": "${domain}",              |      "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",              |      "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",              |      "description": "A description for the spark_data"              |    }              |  }              |}              |""".stripMargin          execJsonAtlas(jsonBody)         sparkDataEntitys(inast)       }     }      // Возвращает функцию, которая генерирует и отправляет сущности данных для Spark     sparkDataEntitys   } 

Пояснения:

  • senderEntity: Функция, которая создает и отправляет JSON для сущностей в Apache Atlas, используя предоставленные функции преобразования и отправки.

  • generatotrProcessEntity: Функция, которая генерирует JSON для различных типов узлов в AST и преобразует их в формат, пригодный для Apache Atlas.

  • generatorDataEntities: Функция, которая создает и отправляет данные сущностей для Spark, рекурсивно обрабатывая детей узлов в AST.

И обновляем методы для работы с AST

   // Преобразует AST в сущности Apache Atlas и отправляет их на указанный эндпоинт   def ASTToAtlasEntity(ast: AST, domain: String): Unit = {      // Создает функцию отправки JSON-данных для сущностей в Apache Atlas     val entitySender = senderJsonToAtlasEndpoint("entity")      // Создает функцию для генерации квалифицированного имени     val qualifiedName = generatorQualifiedName(domain)      // Создает функцию для генерации JSON-сущностей для процессов     val generatorProcessEntity = generatotrProcessEntity(domain, qualifiedName)      // Создает функцию для отправки JSON-данных сущностей в Atlas     val sendEntity = senderEntity(generatorProcessEntity, entitySender)      // Создает функцию для генерации данных сущностей и отправки их в Atlas     val generateDataEntity = generatorDataEntities(domain, entitySender)      // Обрабатывает один узел AST, отправляя его как сущность в Atlas     def processNode(ast: AST, intopLevelExpr: String): Unit = {       sendEntity(ast, intopLevelExpr)     }      // Рекурсивно проходит по всему дереву AST, обрабатывая каждый узел     def traverseAST(ast: AST, intopLevelExpr: String): Unit = {       processNode(ast, intopLevelExpr)       ast.children.foreach(ch => traverseAST(ch, ast.levelExpr))     }      // Создает базовые выходные и входные сущности для указанного домена и отправляет их в Atlas     createBaseOutput(domain, entitySender)     createBaseInput(domain, entitySender)      // Генерирует данные сущностей для AST и отправляет их в Atlas     generateDataEntity(ast)      // Запускает рекурсивное прохождение AST     traverseAST(ast, "")   }    // Обогащает класс AST функцией для преобразования его в сущности Apache Atlas   implicit class converter(ast: AST) {      // Преобразует текущий узел AST в сущности Apache Atlas и отправляет их на указанный эндпоинт     def EntityToAtlas(domain: String): Unit = {       ASTToAtlasEntity(ast, domain)     }    } 

Пояснения:

  • ASTToAtlasEntity: Основной метод, который:

    • Создает функции для преобразования AST в JSON и отправки его в Apache Atlas.

    • Определяет вспомогательные функции для обработки узлов AST и рекурсивного обхода дерева.

    • Создает и отправляет базовые сущности (входные и выходные) в Atlas.

    • Рекурсивно проходит по дереву AST и отправляет каждую сущность в Atlas.

  • implicit class converter(ast: AST): Обогащает класс AST, добавляя метод для преобразования AST в сущности Apache Atlas.

    • EntityToAtlas: Использует метод ASTToAtlasEntity для преобразования текущего узла AST в сущности Atlas и отправки их в указанный домен.

Теперь после запуска В Apache Atlas таки появиться Linage

Чтож, на изначальный logical план вроде похоже

Project [model#17, manufacturer#18, 2024-09-12 16:57:34.046609 AS processedDDTM#36] +- Project [model#17, manufacturer#18]    +- Filter NOT (manufacturer#18 = Audi)       +- Union false, false          :- Relation [model#17,manufacturer#18] csv          +- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]             +- LocalRelation [_1#23, _2#24]

P.S. код можно глянуть тут
P.P.S докер фалы для запуска Apache Atlas можно взять тут


ссылка на оригинал статьи https://habr.com/ru/articles/842718/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *