Как Flink Table API упрощает разработку

от автора

Всем привет!

В этой статье мы посмотрим на два варианта решения «типичной» задачи потоковой обработки с учетом состояния при помощи фреймворка Apache Flink.

Если вы только начинаете знакомство с Flink, здесь можно найти обильное количество материалов.

Постановка задачи

Итак, давайте представим базу данных (пусть это будет PostgreSQL) состоящую из трех таблиц:

  1. Clients — общая информация по клиенту:

Имя поля

Тип поля

Описание

clientId

int

уникальный идентификатор клиента

name

string

имя клиента

surname

string

фамилия клиента

patronymic

optional[string]

отчество клиента (необязательное поле)

sex

string

пол

  1. ClientCompany — информация о компании клиента:

Имя поля

Тип поля

Описание

clientId

int

уникальный идентификатор клиента

companyId

int

уникальный идентификатор компании

companyName

string

название компании

  1. Payment — информация о платежах клиента:

Имя поля

Тип поля

Описание

clientId

int

уникальный идентификатор клиента

amount

int

сумма платежа

tmMs

long

время свершения платежа

Будем получать обновления по всем таблицам через CDC-канал (пусть это будет Debezium) в топик Kafka в формате Json. Задачей нашей Job-ы будет слушать топики и выдавать обновления по клиенту (например: клиент сменил компанию) во внешнюю систему (в рамках статьи в качестве внешней системы будем использовать консоль вывода).

Изображение

Рисунок 1 — Схема бизнесс процесса

Начинаем программировать

Наш технологический стек:

  1. Scala v2.12.10

  2. Flink v1.16.0

  3. Circe v0.14.3

Мы разберем 2 варианта решения:

  1. При помощи Table API

  2. При помощи классического State Processor API

Прежде чем писать основную логику нам потребуется реализовать несколько общих шагов, которыми мы будем пользоваться в обеих реализациях.

Первое что нам потребуется — это научиться читать топики Kafka, в этом нам поможет уже готовый Flink Kafka Connector:

sealed trait KafkaConsumerSource {    def configure[A: Decoder: ClassTag](bootstrapServers: String, topicName: String): KafkaSource[A] = {     KafkaSource       .builder()       .setBootstrapServers(bootstrapServers)       .setGroupId(s"$topicName-group")       .setTopics(topicName)       .setDeserializer(new KafkaJsonRecordDeserializer[A])       .setStartingOffsets(OffsetsInitializer.earliest())       .build()   } }  object KafkaConsumerSource extends KafkaConsumerSource {    def configureKafkaDataSource[A: Decoder: ClassTag](       topicName: String   )(implicit env: StreamExecutionEnvironment,     typeInformation: TypeInformation[A]): DataStream[A] = {     val kafkaSource = KafkaConsumerSource.configure[A](Kafka.BootstrapServers, topicName)     env.fromSource[A](kafkaSource, WatermarkStrategy.noWatermarks[A](), s"$topicName-flow")   } }  class KafkaJsonRecordDeserializer[A: Decoder: ClassTag] extends KafkaRecordDeserializationSchema[A] {    override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[A]): Unit = {     val value = new String(record.value())     val obj   = Serde.toObj(value).toOption // в случае неудачи при десериализации, ошибка просто игнорируется и событие отбрасывается (в реальных проектах так делать не стоит)     obj.foreach(out.collect)   }    override def getProducedType: TypeInformation[A] = {     TypeExtractor       .getForClass(classTag[A].runtimeClass.asInstanceOf[Class[A]])       .asInstanceOf[TypeInformation[A]]   } } 

В сериализации/десериализации сообщений нам поможет Circe:

object Serde {    def toObj[A: Decoder](json: String): Either[circe.Error, A] = decode[A](json)    def toJson[A: Encoder](obj: A): String = obj.asJson.toString() } 

Мы немного упростим себе задачу, сымитируем поведение CDC-канала, отправляя сообщения требуемого формата напрямую в топик Kafka:

case class Client (before: Option[Client.Value], after: Option[Client.Value], op: String)  object Client {   case class Value(clientId: Int, name: String, surname: String, patronymic: Option[String], sex: String) }  case class ClientCompany (before: Option[ClientCompany.Value], after: Option[ClientCompany.Value], op: String)  object ClientCompany {   case class Value(clientId: Int, companyId: Int, companyName: String) }  case class Payment (before: Option[Payment.Value], after: Option[Payment.Value], op: String)  object Payment {   case class Value(clientId: Int, amount: Int, tmMs: Long) }  

Теперь мы готовы приступить к основной части.

Использование Flink Table API

Итак, прежде всего нам нужно получить необходимое окружение, поскольку мы будем работать в потоковом режиме, заручимся таковым:

implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

Далее получим окружение для доступа к Table API :

implicit val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) 

Формируем Kafka Source при помощи кода который мы подготовили выше:

val clients: DataStream[Client] = KafkaConsumerSource.configureKafkaDataSource[Client]("clients") 

Теперь нам остался всего один шаг до получения представления с которым нам предстоит работать — Dynamic Tables. Чтобы сделать этот шаг максимально простым, воспользуемся возможностями Scala и расширим DataStream методом toStreamTable:

implicit class DataStreamOps[A: RowTransformer](val stream: DataStream[A]) {    def toStreamTable(primaryKey: String*)(implicit env: StreamTableEnvironment): Table = {     val transformer = RowTransformer[A]     stream       .map(transformer.toRow(_))       .flatMap(_.toSeq)(transformer.typeInformation)       .toChangelogTable(         env,         Schema           .newBuilder()           .primaryKey(primaryKey: _*)           .build(),         ChangelogMode.upsert()       )   } } 

На вход функции подается список имен полей, которые составят первичный ключ нашей таблицы. Как и в большинстве фреймворков наименьшей единицей для работы с таблицами является строка (Row). Нужен способ преобразования всех наших сущностей в тип Row, продолжим пользоваться возможностями Scala, на этот раз тем что Scala позволяет писать код в функциональном стиле, а конкретно мы воспользуемся Type Class-ми. Наш Type Class будет иметь следующий вид:

sealed trait RowTransformer[A] {   def toRow(entity: A): Option[Row]   protected val fieldsName: Array[String]   implicit def typeInformation: TypeInformation[Row] } 

Представим реализацию для одной из таблиц (для остальных алгоритм будет аналогичен):

implicit case object ClientRow extends RowTransformer[Client] {   override def toRow(entity: Client): Option[Row] = {     val kind   = kindOf(entity.operation)     val record = entity.before.orElse(entity.after)     record.map(r => Row.ofKind(kind, Int.box(r.clientId), r.name, r.surname, r.patronymic.orNull, r.sex))   }    override implicit def typeInformation: TypeInformation[Row] = {     Types.ROW_NAMED(fieldsName, Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING)   }    override protected val fieldsName: Array[String] = Array("clientId", "name", "surname", "patronymic", "sex") } 

Теперь мы можем преобразовывать DataStream в Table одним действием:

val clients: Table = KafkaConsumerSource.configureKafkaDataSource[Client]("clients")   .toStreamTable("clientId") 

Это далеко не единственный способ создания таблиц, если мы изначально хотим работать с данными как с таблицей, необязательно сперва представлять данные в виде DataStream:

tableEnv.createTable(   "companies",   TableDescriptor     .forConnector("kafka")     .schema(       Schema         .newBuilder()         .column("clientId", DataTypes.INT().notNull())         .column("companyId", DataTypes.INT().notNull())         .column("companyName", DataTypes.STRING().notNull())         .primaryKey("clientId")         .build()     )     .option(KafkaConnectorOptions.TOPIC, Seq("companies").asJava)     .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, Kafka.BootstrapServers)     .option(KafkaConnectorOptions.PROPS_GROUP_ID, "companies-group")     .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET)     .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")     .build() )  val companies: Table = tableEnv.sqlQuery("SELECT * FROM companies")  companies.print("Companies") 

И конечно же куда без SQL:

tableEnv.executeSql(   """     |CREATE TABLE payments (clientId INT NOT NULL, amount INT NOT NULL, tmMs BIGINT NOT NULL)     |WITH (     | 'connector' = 'kafka',     | 'topic' = 'payments',     | 'properties.bootstrap.servers' = 'localhost:9092',     | 'properties.group.id' = 'payments-group',     | 'scan.startup.mode' = 'earliest-offset',     | 'format' = 'debezium-json'     |)     |""".stripMargin)  val payments: Table = tableEnv.sqlQuery("SELECT * FROM payments")  payments.print("Payments") 

Мы окончательно подготовились к написанию основной логики. Для нашей задачи будем использовать inner join, т.е. при получении нового события мы будем выдавать результат во внешнюю систему только в случае наличия данных по ключу во всех таблицах.

clients   .join(companies, $"client.clientId" === $"company.clientId")   .join(payments, $"client.clientId" === $"payment.clientId")   .select("client.clientId", "name", "surname", "companyId", "companyName", "amount", "timestamp")   .toChangelogStream(Schema.derived(), ChangelogMode.upsert())   .print("Portfolio") 
Изображение

Рисунок 2 — Dynamic Tables

Использование State Processor API

Снова начнем с окружения, в этом варианте нам будет достаточно только окружения для работы в потоковом режиме:

implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

В прошлом варианте мы приводили все наши сущности к типу Table, сейчас нам нужно сделать нечто похожее:

case class PortfolioState(    id: Int,    client: Option[Client.Value],    company: Option[ClientCompany.Value],    payment: Option[Payment.Value]) {    def complete: Option[Portfolio] = {     for {       client  <- client       company <- company       payment <- payment     } yield {       Portfolio(         client.clientId,         client.name,         client.surname,         company.companyId,         company.companyName,         payment.amount,         payment.timestamp       )     }   } } 

Мы описали свой собственный аналог таблицы, добавив в него метод complete формирующий результат только в случае наличия данных по всем трем сущностям (аналог inner join). Опишем способ трансформации исходной сущности в наш тип (для остальных сущностей будет аналогично):

object PortfolioState {    def apply(client: Client): PortfolioState = {     val id = client.before.map(_.clientId).getOrElse(client.after.map(_.clientId).get)     PortfolioState(id, client.before.orElse(client.after), None, None)   } } 

Итоговый результат:

val clients: DataStream[PortfolioState] = KafkaConsumerSource.configureKafkaDataSource[Client]("clients").map(PortfolioState(_)) 

Последнее чего нам не хватает — это обработчика событий, который будет поддерживать актуальное состояние и выдавать результаты:

class StateProcessor extends KeyedProcessFunction[Int, PortfolioState, Portfolio] {    private var state: ValueState[PortfolioState] = _    override def open(parameters: Configuration): Unit = {     state = getRuntimeContext.getState(       new ValueStateDescriptor[PortfolioState]("state", createTypeInformation[PortfolioState])     )   }    override def processElement(      value: PortfolioState,      ctx: KeyedProcessFunction[Int, PortfolioState, Portfolio]#Context,      ut: Collector[Portfolio]   ): Unit = {     Option(state.value()).fold {       state.update(value) // если state == null, инициализируем его входным значением     } { currentState =>       val updatedState = currentState.copy(         client = value.client.orElse(currentState.client),         company = value.company.orElse(currentState.company),         payment = value.payment.orElse(currentState.payment)       )       updatedState.complete.foreach(out.collect) // если данных хватает, то выдаем результат       state.update(updatedState)     }   } } 

Соберем все вместе:

clients   .union(companies) // объединяем все три потока в один   .union(payments)   .keyBy(_.id) // шардиурем данные по полю id (clientId)   .process(new PortfolioStateProcessor())   .print("Portfolio") 

Итоги

Мы рассмотрели 2 разных подхода, даже несмотря на всю простоту постановки подход с применением Table API для задач отслеживания изменений и моментальной выдачи результат выглядит привлекательнее, мы получаем возможность описывать желаемый результат на привычном SQL-подобном синтаксисе находясь в бесконечном потоке событий.

Полный код проекта можно найти здесь


ссылка на оригинал статьи https://habr.com/ru/companies/neoflex/articles/745730/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *