
Всем привет!
В этой статье мы посмотрим на два варианта решения «типичной» задачи потоковой обработки с учетом состояния при помощи фреймворка Apache Flink.
Если вы только начинаете знакомство с Flink, здесь можно найти обильное количество материалов.
Постановка задачи
Итак, давайте представим базу данных (пусть это будет PostgreSQL) состоящую из трех таблиц:
-
Clients — общая информация по клиенту:
|
Имя поля |
Тип поля |
Описание |
|---|---|---|
|
clientId |
int |
уникальный идентификатор клиента |
|
name |
string |
имя клиента |
|
surname |
string |
фамилия клиента |
|
patronymic |
optional[string] |
отчество клиента (необязательное поле) |
|
sex |
string |
пол |
-
ClientCompany — информация о компании клиента:
|
Имя поля |
Тип поля |
Описание |
|---|---|---|
|
clientId |
int |
уникальный идентификатор клиента |
|
companyId |
int |
уникальный идентификатор компании |
|
companyName |
string |
название компании |
-
Payment — информация о платежах клиента:
|
Имя поля |
Тип поля |
Описание |
|---|---|---|
|
clientId |
int |
уникальный идентификатор клиента |
|
amount |
int |
сумма платежа |
|
tmMs |
long |
время свершения платежа |
Будем получать обновления по всем таблицам через CDC-канал (пусть это будет Debezium) в топик Kafka в формате Json. Задачей нашей Job-ы будет слушать топики и выдавать обновления по клиенту (например: клиент сменил компанию) во внешнюю систему (в рамках статьи в качестве внешней системы будем использовать консоль вывода).
Начинаем программировать
Наш технологический стек:
Мы разберем 2 варианта решения:
-
При помощи Table API
-
При помощи классического State Processor API
Прежде чем писать основную логику нам потребуется реализовать несколько общих шагов, которыми мы будем пользоваться в обеих реализациях.
Первое что нам потребуется — это научиться читать топики Kafka, в этом нам поможет уже готовый Flink Kafka Connector:
sealed trait KafkaConsumerSource { def configure[A: Decoder: ClassTag](bootstrapServers: String, topicName: String): KafkaSource[A] = { KafkaSource .builder() .setBootstrapServers(bootstrapServers) .setGroupId(s"$topicName-group") .setTopics(topicName) .setDeserializer(new KafkaJsonRecordDeserializer[A]) .setStartingOffsets(OffsetsInitializer.earliest()) .build() } } object KafkaConsumerSource extends KafkaConsumerSource { def configureKafkaDataSource[A: Decoder: ClassTag]( topicName: String )(implicit env: StreamExecutionEnvironment, typeInformation: TypeInformation[A]): DataStream[A] = { val kafkaSource = KafkaConsumerSource.configure[A](Kafka.BootstrapServers, topicName) env.fromSource[A](kafkaSource, WatermarkStrategy.noWatermarks[A](), s"$topicName-flow") } } class KafkaJsonRecordDeserializer[A: Decoder: ClassTag] extends KafkaRecordDeserializationSchema[A] { override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[A]): Unit = { val value = new String(record.value()) val obj = Serde.toObj(value).toOption // в случае неудачи при десериализации, ошибка просто игнорируется и событие отбрасывается (в реальных проектах так делать не стоит) obj.foreach(out.collect) } override def getProducedType: TypeInformation[A] = { TypeExtractor .getForClass(classTag[A].runtimeClass.asInstanceOf[Class[A]]) .asInstanceOf[TypeInformation[A]] } }
В сериализации/десериализации сообщений нам поможет Circe:
object Serde { def toObj[A: Decoder](json: String): Either[circe.Error, A] = decode[A](json) def toJson[A: Encoder](obj: A): String = obj.asJson.toString() }
Мы немного упростим себе задачу, сымитируем поведение CDC-канала, отправляя сообщения требуемого формата напрямую в топик Kafka:
case class Client (before: Option[Client.Value], after: Option[Client.Value], op: String) object Client { case class Value(clientId: Int, name: String, surname: String, patronymic: Option[String], sex: String) } case class ClientCompany (before: Option[ClientCompany.Value], after: Option[ClientCompany.Value], op: String) object ClientCompany { case class Value(clientId: Int, companyId: Int, companyName: String) } case class Payment (before: Option[Payment.Value], after: Option[Payment.Value], op: String) object Payment { case class Value(clientId: Int, amount: Int, tmMs: Long) }
Теперь мы готовы приступить к основной части.
Использование Flink Table API
Итак, прежде всего нам нужно получить необходимое окружение, поскольку мы будем работать в потоковом режиме, заручимся таковым:
implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
Далее получим окружение для доступа к Table API :
implicit val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
Формируем Kafka Source при помощи кода который мы подготовили выше:
val clients: DataStream[Client] = KafkaConsumerSource.configureKafkaDataSource[Client]("clients")
Теперь нам остался всего один шаг до получения представления с которым нам предстоит работать — Dynamic Tables. Чтобы сделать этот шаг максимально простым, воспользуемся возможностями Scala и расширим DataStream методом toStreamTable:
implicit class DataStreamOps[A: RowTransformer](val stream: DataStream[A]) { def toStreamTable(primaryKey: String*)(implicit env: StreamTableEnvironment): Table = { val transformer = RowTransformer[A] stream .map(transformer.toRow(_)) .flatMap(_.toSeq)(transformer.typeInformation) .toChangelogTable( env, Schema .newBuilder() .primaryKey(primaryKey: _*) .build(), ChangelogMode.upsert() ) } }
На вход функции подается список имен полей, которые составят первичный ключ нашей таблицы. Как и в большинстве фреймворков наименьшей единицей для работы с таблицами является строка (Row). Нужен способ преобразования всех наших сущностей в тип Row, продолжим пользоваться возможностями Scala, на этот раз тем что Scala позволяет писать код в функциональном стиле, а конкретно мы воспользуемся Type Class-ми. Наш Type Class будет иметь следующий вид:
sealed trait RowTransformer[A] { def toRow(entity: A): Option[Row] protected val fieldsName: Array[String] implicit def typeInformation: TypeInformation[Row] }
Представим реализацию для одной из таблиц (для остальных алгоритм будет аналогичен):
implicit case object ClientRow extends RowTransformer[Client] { override def toRow(entity: Client): Option[Row] = { val kind = kindOf(entity.operation) val record = entity.before.orElse(entity.after) record.map(r => Row.ofKind(kind, Int.box(r.clientId), r.name, r.surname, r.patronymic.orNull, r.sex)) } override implicit def typeInformation: TypeInformation[Row] = { Types.ROW_NAMED(fieldsName, Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING) } override protected val fieldsName: Array[String] = Array("clientId", "name", "surname", "patronymic", "sex") }
Теперь мы можем преобразовывать DataStream в Table одним действием:
val clients: Table = KafkaConsumerSource.configureKafkaDataSource[Client]("clients") .toStreamTable("clientId")
Это далеко не единственный способ создания таблиц, если мы изначально хотим работать с данными как с таблицей, необязательно сперва представлять данные в виде DataStream:
tableEnv.createTable( "companies", TableDescriptor .forConnector("kafka") .schema( Schema .newBuilder() .column("clientId", DataTypes.INT().notNull()) .column("companyId", DataTypes.INT().notNull()) .column("companyName", DataTypes.STRING().notNull()) .primaryKey("clientId") .build() ) .option(KafkaConnectorOptions.TOPIC, Seq("companies").asJava) .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, Kafka.BootstrapServers) .option(KafkaConnectorOptions.PROPS_GROUP_ID, "companies-group") .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET) .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") .build() ) val companies: Table = tableEnv.sqlQuery("SELECT * FROM companies") companies.print("Companies")
И конечно же куда без SQL:
tableEnv.executeSql( """ |CREATE TABLE payments (clientId INT NOT NULL, amount INT NOT NULL, tmMs BIGINT NOT NULL) |WITH ( | 'connector' = 'kafka', | 'topic' = 'payments', | 'properties.bootstrap.servers' = 'localhost:9092', | 'properties.group.id' = 'payments-group', | 'scan.startup.mode' = 'earliest-offset', | 'format' = 'debezium-json' |) |""".stripMargin) val payments: Table = tableEnv.sqlQuery("SELECT * FROM payments") payments.print("Payments")
Мы окончательно подготовились к написанию основной логики. Для нашей задачи будем использовать inner join, т.е. при получении нового события мы будем выдавать результат во внешнюю систему только в случае наличия данных по ключу во всех таблицах.
clients .join(companies, $"client.clientId" === $"company.clientId") .join(payments, $"client.clientId" === $"payment.clientId") .select("client.clientId", "name", "surname", "companyId", "companyName", "amount", "timestamp") .toChangelogStream(Schema.derived(), ChangelogMode.upsert()) .print("Portfolio")
Использование State Processor API
Снова начнем с окружения, в этом варианте нам будет достаточно только окружения для работы в потоковом режиме:
implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
В прошлом варианте мы приводили все наши сущности к типу Table, сейчас нам нужно сделать нечто похожее:
case class PortfolioState( id: Int, client: Option[Client.Value], company: Option[ClientCompany.Value], payment: Option[Payment.Value]) { def complete: Option[Portfolio] = { for { client <- client company <- company payment <- payment } yield { Portfolio( client.clientId, client.name, client.surname, company.companyId, company.companyName, payment.amount, payment.timestamp ) } } }
Мы описали свой собственный аналог таблицы, добавив в него метод complete формирующий результат только в случае наличия данных по всем трем сущностям (аналог inner join). Опишем способ трансформации исходной сущности в наш тип (для остальных сущностей будет аналогично):
object PortfolioState { def apply(client: Client): PortfolioState = { val id = client.before.map(_.clientId).getOrElse(client.after.map(_.clientId).get) PortfolioState(id, client.before.orElse(client.after), None, None) } }
Итоговый результат:
val clients: DataStream[PortfolioState] = KafkaConsumerSource.configureKafkaDataSource[Client]("clients").map(PortfolioState(_))
Последнее чего нам не хватает — это обработчика событий, который будет поддерживать актуальное состояние и выдавать результаты:
class StateProcessor extends KeyedProcessFunction[Int, PortfolioState, Portfolio] { private var state: ValueState[PortfolioState] = _ override def open(parameters: Configuration): Unit = { state = getRuntimeContext.getState( new ValueStateDescriptor[PortfolioState]("state", createTypeInformation[PortfolioState]) ) } override def processElement( value: PortfolioState, ctx: KeyedProcessFunction[Int, PortfolioState, Portfolio]#Context, ut: Collector[Portfolio] ): Unit = { Option(state.value()).fold { state.update(value) // если state == null, инициализируем его входным значением } { currentState => val updatedState = currentState.copy( client = value.client.orElse(currentState.client), company = value.company.orElse(currentState.company), payment = value.payment.orElse(currentState.payment) ) updatedState.complete.foreach(out.collect) // если данных хватает, то выдаем результат state.update(updatedState) } } }
Соберем все вместе:
clients .union(companies) // объединяем все три потока в один .union(payments) .keyBy(_.id) // шардиурем данные по полю id (clientId) .process(new PortfolioStateProcessor()) .print("Portfolio")
Итоги
Мы рассмотрели 2 разных подхода, даже несмотря на всю простоту постановки подход с применением Table API для задач отслеживания изменений и моментальной выдачи результат выглядит привлекательнее, мы получаем возможность описывать желаемый результат на привычном SQL-подобном синтаксисе находясь в бесконечном потоке событий.
Полный код проекта можно найти здесь
ссылка на оригинал статьи https://habr.com/ru/companies/neoflex/articles/745730/
Добавить комментарий