{"id":349722,"date":"2023-07-04T09:04:43","date_gmt":"2023-07-04T09:04:43","guid":{"rendered":"http:\/\/savepearlharbor.com\/?p=349722"},"modified":"-0001-11-30T00:00:00","modified_gmt":"-0001-11-29T21:00:00","slug":"","status":"publish","type":"post","link":"https:\/\/savepearlharbor.com\/?p=349722","title":{"rendered":"<span>\u041a\u0430\u043a Flink Table API \u0443\u043f\u0440\u043e\u0449\u0430\u0435\u0442 \u0440\u0430\u0437\u0440\u0430\u0431\u043e\u0442\u043a\u0443<\/span>"},"content":{"rendered":"<div><\/div>\n<div id=\"post-content-body\">\n<div>\n<div class=\"article-formatted-body article-formatted-body article-formatted-body_version-2\">\n<div xmlns=\"http:\/\/www.w3.org\/1999\/xhtml\">\n<figure class=\"\"><img decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w780q1\/getpro\/habr\/post_images\/e18\/b73\/c30\/e18b73c300a0c09dad1ff95b60d2a883.jpg\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/post_images\/e18\/b73\/c30\/e18b73c300a0c09dad1ff95b60d2a883.jpg\" data-blurred=\"true\"\/><\/figure>\n<p>\u0412\u0441\u0435\u043c \u043f\u0440\u0438\u0432\u0435\u0442!<\/p>\n<p>\u0412 \u044d\u0442\u043e\u0439 \u0441\u0442\u0430\u0442\u044c\u0435 \u043c\u044b \u043f\u043e\u0441\u043c\u043e\u0442\u0440\u0438\u043c \u043d\u0430 \u0434\u0432\u0430 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0430 \u0440\u0435\u0448\u0435\u043d\u0438\u044f &#171;\u0442\u0438\u043f\u0438\u0447\u043d\u043e\u0439&#187; \u0437\u0430\u0434\u0430\u0447\u0438 \u043f\u043e\u0442\u043e\u043a\u043e\u0432\u043e\u0439 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438 \u0441 \u0443\u0447\u0435\u0442\u043e\u043c \u0441\u043e\u0441\u0442\u043e\u044f\u043d\u0438\u044f \u043f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 \u0444\u0440\u0435\u0439\u043c\u0432\u043e\u0440\u043a\u0430 <em>Apache Flink<\/em>.<\/p>\n<p>\u0415\u0441\u043b\u0438 \u0432\u044b \u0442\u043e\u043b\u044c\u043a\u043e \u043d\u0430\u0447\u0438\u043d\u0430\u0435\u0442\u0435 \u0437\u043d\u0430\u043a\u043e\u043c\u0441\u0442\u0432\u043e \u0441 <em>Flink<\/em>, <a href=\"https:\/\/www.youtube.com\/@flinkforward\">\u0437\u0434\u0435\u0441\u044c<\/a> \u043c\u043e\u0436\u043d\u043e \u043d\u0430\u0439\u0442\u0438 \u043e\u0431\u0438\u043b\u044c\u043d\u043e\u0435 \u043a\u043e\u043b\u0438\u0447\u0435\u0441\u0442\u0432\u043e \u043c\u0430\u0442\u0435\u0440\u0438\u0430\u043b\u043e\u0432.<\/p>\n<h2>\u041f\u043e\u0441\u0442\u0430\u043d\u043e\u0432\u043a\u0430 \u0437\u0430\u0434\u0430\u0447\u0438<\/h2>\n<p>\u0418\u0442\u0430\u043a, \u0434\u0430\u0432\u0430\u0439\u0442\u0435 \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u0438\u043c \u0431\u0430\u0437\u0443 \u0434\u0430\u043d\u043d\u044b\u0445 (\u043f\u0443\u0441\u0442\u044c \u044d\u0442\u043e \u0431\u0443\u0434\u0435\u0442 <em>PostgreSQL<\/em>) \u0441\u043e\u0441\u0442\u043e\u044f\u0449\u0443\u044e \u0438\u0437 \u0442\u0440\u0435\u0445 \u0442\u0430\u0431\u043b\u0438\u0446:<\/p>\n<ol>\n<li>\n<p><em>Clients<\/em> &#8212; \u043e\u0431\u0449\u0430\u044f \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044f \u043f\u043e \u043a\u043b\u0438\u0435\u043d\u0442\u0443:<\/p>\n<\/li>\n<\/ol>\n<div>\n<div class=\"table\">\n<table>\n<tbody>\n<tr>\n<th>\n<p align=\"left\">\u0418\u043c\u044f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u0422\u0438\u043f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435<\/p>\n<\/th>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">clientId<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0443\u043d\u0438\u043a\u0430\u043b\u044c\u043d\u044b\u0439 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">name<\/p>\n<\/td>\n<td>\n<p align=\"left\">string<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0438\u043c\u044f \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">surname<\/p>\n<\/td>\n<td>\n<p align=\"left\">string<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0444\u0430\u043c\u0438\u043b\u0438\u044f \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">patronymic<\/p>\n<\/td>\n<td>\n<p align=\"left\">optional[string]<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u043e\u0442\u0447\u0435\u0441\u0442\u0432\u043e \u043a\u043b\u0438\u0435\u043d\u0442\u0430 (\u043d\u0435\u043e\u0431\u044f\u0437\u0430\u0442\u0435\u043b\u044c\u043d\u043e\u0435 \u043f\u043e\u043b\u0435)<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">sex<\/p>\n<\/td>\n<td>\n<p align=\"left\">string<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u043f\u043e\u043b<\/p>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<ol start=\"2\">\n<li>\n<p><em>ClientCompany<\/em> &#8212; \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044f \u043e \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u0438 \u043a\u043b\u0438\u0435\u043d\u0442\u0430:<\/p>\n<\/li>\n<\/ol>\n<div>\n<div class=\"table\">\n<table>\n<tbody>\n<tr>\n<th>\n<p align=\"left\">\u0418\u043c\u044f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u0422\u0438\u043f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435<\/p>\n<\/th>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">clientId<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0443\u043d\u0438\u043a\u0430\u043b\u044c\u043d\u044b\u0439 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">companyId<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0443\u043d\u0438\u043a\u0430\u043b\u044c\u043d\u044b\u0439 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u0438<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">companyName<\/p>\n<\/td>\n<td>\n<p align=\"left\">string<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u043d\u0430\u0437\u0432\u0430\u043d\u0438\u0435 \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u0438<\/p>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<ol start=\"3\">\n<li>\n<p><em>Payment<\/em> &#8212; \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044f \u043e \u043f\u043b\u0430\u0442\u0435\u0436\u0430\u0445 \u043a\u043b\u0438\u0435\u043d\u0442\u0430:<\/p>\n<\/li>\n<\/ol>\n<div>\n<div class=\"table\">\n<table>\n<tbody>\n<tr>\n<th>\n<p align=\"left\">\u0418\u043c\u044f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u0422\u0438\u043f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435<\/p>\n<\/th>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">clientId<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0443\u043d\u0438\u043a\u0430\u043b\u044c\u043d\u044b\u0439 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">amount<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0441\u0443\u043c\u043c\u0430 \u043f\u043b\u0430\u0442\u0435\u0436\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">tmMs<\/p>\n<\/td>\n<td>\n<p align=\"left\">long<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0432\u0440\u0435\u043c\u044f \u0441\u0432\u0435\u0440\u0448\u0435\u043d\u0438\u044f \u043f\u043b\u0430\u0442\u0435\u0436\u0430<\/p>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>\u0411\u0443\u0434\u0435\u043c \u043f\u043e\u043b\u0443\u0447\u0430\u0442\u044c \u043e\u0431\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u043f\u043e \u0432\u0441\u0435\u043c \u0442\u0430\u0431\u043b\u0438\u0446\u0430\u043c \u0447\u0435\u0440\u0435\u0437 <a href=\"https:\/\/ru.wikipedia.org\/wiki\/%D0%97%D0%B0%D1%85%D0%B2%D0%B0%D1%82_%D0%B8%D0%B7%D0%BC%D0%B5%D0%BD%D0%B5%D0%BD%D0%B8%D1%8F_%D0%B4%D0%B0%D0%BD%D0%BD%D1%8B%D1%85\">CDC<\/a>-\u043a\u0430\u043d\u0430\u043b (\u043f\u0443\u0441\u0442\u044c \u044d\u0442\u043e \u0431\u0443\u0434\u0435\u0442 <a href=\"https:\/\/debezium.io\/\">Debezium<\/a>) \u0432 \u0442\u043e\u043f\u0438\u043a <a href=\"https:\/\/kafka.apache.org\/\">Kafka<\/a> \u0432 \u0444\u043e\u0440\u043c\u0430\u0442\u0435 <a href=\"https:\/\/www.json.org\/json-en.html\">Json<\/a>. \u0417\u0430\u0434\u0430\u0447\u0435\u0439 \u043d\u0430\u0448\u0435\u0439 <em>Job<\/em>-\u044b \u0431\u0443\u0434\u0435\u0442 \u0441\u043b\u0443\u0448\u0430\u0442\u044c \u0442\u043e\u043f\u0438\u043a\u0438 \u0438 \u0432\u044b\u0434\u0430\u0432\u0430\u0442\u044c \u043e\u0431\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u043f\u043e \u043a\u043b\u0438\u0435\u043d\u0442\u0443 (\u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440: \u043a\u043b\u0438\u0435\u043d\u0442 \u0441\u043c\u0435\u043d\u0438\u043b \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u044e) \u0432\u043e \u0432\u043d\u0435\u0448\u043d\u044e\u044e \u0441\u0438\u0441\u0442\u0435\u043c\u0443 (\u0432 \u0440\u0430\u043c\u043a\u0430\u0445 \u0441\u0442\u0430\u0442\u044c\u0438 \u0432 \u043a\u0430\u0447\u0435\u0441\u0442\u0432\u0435 \u0432\u043d\u0435\u0448\u043d\u0435\u0439 \u0441\u0438\u0441\u0442\u0435\u043c\u044b \u0431\u0443\u0434\u0435\u043c \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u043a\u043e\u043d\u0441\u043e\u043b\u044c \u0432\u044b\u0432\u043e\u0434\u0430).<\/p>\n<figure class=\"\"><img decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w1560\/getpro\/habr\/post_images\/9b7\/649\/69c\/9b764969ca28e62db9380a39270a67da.png\" alt=\"\u0418\u0437\u043e\u0431\u0440\u0430\u0436\u0435\u043d\u0438\u0435\" title=\"\u0420\u0438\u0441\u0443\u043d\u043e\u043a 1 - \u0421\u0445\u0435\u043c\u0430 \u0431\u0438\u0437\u043d\u0435\u0441\u0441 \u043f\u0440\u043e\u0446\u0435\u0441\u0441\u0430\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/post_images\/9b7\/649\/69c\/9b764969ca28e62db9380a39270a67da.png\"\/><\/p>\n<div><figcaption>\u0420\u0438\u0441\u0443\u043d\u043e\u043a 1 &#8212; \u0421\u0445\u0435\u043c\u0430 \u0431\u0438\u0437\u043d\u0435\u0441\u0441 \u043f\u0440\u043e\u0446\u0435\u0441\u0441\u0430<\/figcaption><\/div>\n<\/figure>\n<h2>\u041d\u0430\u0447\u0438\u043d\u0430\u0435\u043c \u043f\u0440\u043e\u0433\u0440\u0430\u043c\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c<\/h2>\n<p>\u041d\u0430\u0448 \u0442\u0435\u0445\u043d\u043e\u043b\u043e\u0433\u0438\u0447\u0435\u0441\u043a\u0438\u0439 \u0441\u0442\u0435\u043a:<\/p>\n<ol>\n<li>\n<p><a href=\"https:\/\/docs.scala-lang.org\/tour\/tour-of-scala.html\">Scala<\/a> v2.12.10<\/p>\n<\/li>\n<li>\n<p><a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-stable\/\">Flink<\/a> v1.16.0<\/p>\n<\/li>\n<li>\n<p><a href=\"https:\/\/circe.github.io\/circe\/\">Circe<\/a> v0.14.3<\/p>\n<\/li>\n<\/ol>\n<p>\u041c\u044b \u0440\u0430\u0437\u0431\u0435\u0440\u0435\u043c 2 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0430 \u0440\u0435\u0448\u0435\u043d\u0438\u044f:<\/p>\n<ol>\n<li>\n<p>\u041f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 <a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-release-1.16\/docs\/dev\/table\/overview\/\">Table API<\/a><\/p>\n<\/li>\n<li>\n<p>\u041f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 \u043a\u043b\u0430\u0441\u0441\u0438\u0447\u0435\u0441\u043a\u043e\u0433\u043e <a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-release-1.16\/docs\/libs\/state_processor_api\/#state-processor-api\">State Processor API<\/a><\/p>\n<\/li>\n<\/ol>\n<p>\u041f\u0440\u0435\u0436\u0434\u0435 \u0447\u0435\u043c \u043f\u0438\u0441\u0430\u0442\u044c \u043e\u0441\u043d\u043e\u0432\u043d\u0443\u044e \u043b\u043e\u0433\u0438\u043a\u0443 \u043d\u0430\u043c \u043f\u043e\u0442\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u0442\u044c \u043d\u0435\u0441\u043a\u043e\u043b\u044c\u043a\u043e \u043e\u0431\u0449\u0438\u0445 \u0448\u0430\u0433\u043e\u0432, \u043a\u043e\u0442\u043e\u0440\u044b\u043c\u0438 \u043c\u044b \u0431\u0443\u0434\u0435\u043c \u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u0432 \u043e\u0431\u0435\u0438\u0445 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f\u0445.<\/p>\n<p>\u041f\u0435\u0440\u0432\u043e\u0435 \u0447\u0442\u043e \u043d\u0430\u043c \u043f\u043e\u0442\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f &#8212; \u044d\u0442\u043e \u043d\u0430\u0443\u0447\u0438\u0442\u044c\u0441\u044f \u0447\u0438\u0442\u0430\u0442\u044c \u0442\u043e\u043f\u0438\u043a\u0438 <em>Kafka<\/em>, \u0432 \u044d\u0442\u043e\u043c \u043d\u0430\u043c \u043f\u043e\u043c\u043e\u0436\u0435\u0442 \u0443\u0436\u0435 \u0433\u043e\u0442\u043e\u0432\u044b\u0439 <a href=\"https:\/\/alpinegizmo.github.io\/flink-docs\/dev\/connectors\/kafka.html\">Flink Kafka Connector<\/a>:<\/p>\n<pre><code class=\"scala\">sealed trait KafkaConsumerSource {    def configure[A: Decoder: ClassTag](bootstrapServers: String, topicName: String): KafkaSource[A] = {     KafkaSource       .builder()       .setBootstrapServers(bootstrapServers)       .setGroupId(s\"$topicName-group\")       .setTopics(topicName)       .setDeserializer(new KafkaJsonRecordDeserializer[A])       .setStartingOffsets(OffsetsInitializer.earliest())       .build()   } }  object KafkaConsumerSource extends KafkaConsumerSource {    def configureKafkaDataSource[A: Decoder: ClassTag](       topicName: String   )(implicit env: StreamExecutionEnvironment,     typeInformation: TypeInformation[A]): DataStream[A] = {     val kafkaSource = KafkaConsumerSource.configure[A](Kafka.BootstrapServers, topicName)     env.fromSource[A](kafkaSource, WatermarkStrategy.noWatermarks[A](), s\"$topicName-flow\")   } }  class KafkaJsonRecordDeserializer[A: Decoder: ClassTag] extends KafkaRecordDeserializationSchema[A] {    override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[A]): Unit = {     val value = new String(record.value())     val obj   = Serde.toObj(value).toOption \/\/ \u0432 \u0441\u043b\u0443\u0447\u0430\u0435 \u043d\u0435\u0443\u0434\u0430\u0447\u0438 \u043f\u0440\u0438 \u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438, \u043e\u0448\u0438\u0431\u043a\u0430 \u043f\u0440\u043e\u0441\u0442\u043e \u0438\u0433\u043d\u043e\u0440\u0438\u0440\u0443\u0435\u0442\u0441\u044f \u0438 \u0441\u043e\u0431\u044b\u0442\u0438\u0435 \u043e\u0442\u0431\u0440\u0430\u0441\u044b\u0432\u0430\u0435\u0442\u0441\u044f (\u0432 \u0440\u0435\u0430\u043b\u044c\u043d\u044b\u0445 \u043f\u0440\u043e\u0435\u043a\u0442\u0430\u0445 \u0442\u0430\u043a \u0434\u0435\u043b\u0430\u0442\u044c \u043d\u0435 \u0441\u0442\u043e\u0438\u0442)     obj.foreach(out.collect)   }    override def getProducedType: TypeInformation[A] = {     TypeExtractor       .getForClass(classTag[A].runtimeClass.asInstanceOf[Class[A]])       .asInstanceOf[TypeInformation[A]]   } } <\/code><\/pre>\n<p>\u0412 \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438\/\u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u043d\u0430\u043c \u043f\u043e\u043c\u043e\u0436\u0435\u0442 <em>Circe<\/em>:<\/p>\n<pre><code class=\"scala\">object Serde {    def toObj[A: Decoder](json: String): Either[circe.Error, A] = decode[A](json)    def toJson[A: Encoder](obj: A): String = obj.asJson.toString() } <\/code><\/pre>\n<p>\u041c\u044b \u043d\u0435\u043c\u043d\u043e\u0433\u043e \u0443\u043f\u0440\u043e\u0441\u0442\u0438\u043c \u0441\u0435\u0431\u0435 \u0437\u0430\u0434\u0430\u0447\u0443, \u0441\u044b\u043c\u0438\u0442\u0438\u0440\u0443\u0435\u043c \u043f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 <em>CDC<\/em>-\u043a\u0430\u043d\u0430\u043b\u0430, \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044f \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0442\u0440\u0435\u0431\u0443\u0435\u043c\u043e\u0433\u043e \u0444\u043e\u0440\u043c\u0430\u0442\u0430 \u043d\u0430\u043f\u0440\u044f\u043c\u0443\u044e \u0432 \u0442\u043e\u043f\u0438\u043a <em>Kafka<\/em>:<\/p>\n<pre><code class=\"scala\">case class Client (before: Option[Client.Value], after: Option[Client.Value], op: String)  object Client {   case class Value(clientId: Int, name: String, surname: String, patronymic: Option[String], sex: String) }  case class ClientCompany (before: Option[ClientCompany.Value], after: Option[ClientCompany.Value], op: String)  object ClientCompany {   case class Value(clientId: Int, companyId: Int, companyName: String) }  case class Payment (before: Option[Payment.Value], after: Option[Payment.Value], op: String)  object Payment {   case class Value(clientId: Int, amount: Int, tmMs: Long) }  <\/code><\/pre>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043c\u044b \u0433\u043e\u0442\u043e\u0432\u044b \u043f\u0440\u0438\u0441\u0442\u0443\u043f\u0438\u0442\u044c \u043a \u043e\u0441\u043d\u043e\u0432\u043d\u043e\u0439 \u0447\u0430\u0441\u0442\u0438.<\/p>\n<h2>\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435 Flink Table API<\/h2>\n<p>\u0418\u0442\u0430\u043a, \u043f\u0440\u0435\u0436\u0434\u0435 \u0432\u0441\u0435\u0433\u043e \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u043e \u043f\u043e\u043b\u0443\u0447\u0438\u0442\u044c \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e\u0435 \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u0435, \u043f\u043e\u0441\u043a\u043e\u043b\u044c\u043a\u0443 \u043c\u044b \u0431\u0443\u0434\u0435\u043c \u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c \u0432 \u043f\u043e\u0442\u043e\u043a\u043e\u0432\u043e\u043c \u0440\u0435\u0436\u0438\u043c\u0435, \u0437\u0430\u0440\u0443\u0447\u0438\u043c\u0441\u044f \u0442\u0430\u043a\u043e\u0432\u044b\u043c:<\/p>\n<pre><code class=\"scala\">implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment <\/code><\/pre>\n<p>\u0414\u0430\u043b\u0435\u0435 \u043f\u043e\u043b\u0443\u0447\u0438\u043c \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u0435 \u0434\u043b\u044f \u0434\u043e\u0441\u0442\u0443\u043f\u0430 \u043a <em>Table API<\/em> :<\/p>\n<pre><code class=\"scala\">implicit val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) <\/code><\/pre>\n<p>\u0424\u043e\u0440\u043c\u0438\u0440\u0443\u0435\u043c <em>Kafka Source<\/em> \u043f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 <a href=\"#%D0%9D%D0%B0%D1%87%D0%B8%D0%BD%D0%B0%D0%B5%D0%BC-%D0%BF%D1%80%D0%BE%D0%B3%D1%80%D0%B0%D0%BC%D0%BC%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D1%82%D1%8C\">\u043a\u043e\u0434\u0430<\/a> \u043a\u043e\u0442\u043e\u0440\u044b\u0439 \u043c\u044b \u043f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u0438\u043b\u0438 \u0432\u044b\u0448\u0435:<\/p>\n<pre><code class=\"scala\">val clients: DataStream[Client] = KafkaConsumerSource.configureKafkaDataSource[Client](\"clients\") <\/code><\/pre>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043d\u0430\u043c \u043e\u0441\u0442\u0430\u043b\u0441\u044f \u0432\u0441\u0435\u0433\u043e \u043e\u0434\u0438\u043d \u0448\u0430\u0433 \u0434\u043e \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u044f \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u0435\u043d\u0438\u044f \u0441 \u043a\u043e\u0442\u043e\u0440\u044b\u043c \u043d\u0430\u043c \u043f\u0440\u0435\u0434\u0441\u0442\u043e\u0438\u0442 \u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c &#8212; <a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-release-1.17\/docs\/dev\/table\/concepts\/dynamic_tables\/\">Dynamic Tables<\/a>.  \u0427\u0442\u043e\u0431\u044b \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u044d\u0442\u043e\u0442 \u0448\u0430\u0433 \u043c\u0430\u043a\u0441\u0438\u043c\u0430\u043b\u044c\u043d\u043e \u043f\u0440\u043e\u0441\u0442\u044b\u043c, \u0432\u043e\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c\u0441\u044f \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044f\u043c\u0438 <a href=\"https:\/\/docs.scala-lang.org\/overviews\/core\/implicit-classes.html\">Scala<\/a> \u0438 \u0440\u0430\u0441\u0448\u0438\u0440\u0438\u043c <em>DataStream<\/em> \u043c\u0435\u0442\u043e\u0434\u043e\u043c <em>toStreamTable<\/em>:<\/p>\n<pre><code class=\"scala\">implicit class DataStreamOps[A: RowTransformer](val stream: DataStream[A]) {    def toStreamTable(primaryKey: String*)(implicit env: StreamTableEnvironment): Table = {     val transformer = RowTransformer[A]     stream       .map(transformer.toRow(_))       .flatMap(_.toSeq)(transformer.typeInformation)       .toChangelogTable(         env,         Schema           .newBuilder()           .primaryKey(primaryKey: _*)           .build(),         ChangelogMode.upsert()       )   } } <\/code><\/pre>\n<p>\u041d\u0430 \u0432\u0445\u043e\u0434 \u0444\u0443\u043d\u043a\u0446\u0438\u0438 \u043f\u043e\u0434\u0430\u0435\u0442\u0441\u044f \u0441\u043f\u0438\u0441\u043e\u043a \u0438\u043c\u0435\u043d \u043f\u043e\u043b\u0435\u0439, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0441\u043e\u0441\u0442\u0430\u0432\u044f\u0442 \u043f\u0435\u0440\u0432\u0438\u0447\u043d\u044b\u0439 \u043a\u043b\u044e\u0447 \u043d\u0430\u0448\u0435\u0439 \u0442\u0430\u0431\u043b\u0438\u0446\u044b. \u041a\u0430\u043a \u0438 \u0432 \u0431\u043e\u043b\u044c\u0448\u0438\u043d\u0441\u0442\u0432\u0435 \u0444\u0440\u0435\u0439\u043c\u0432\u043e\u0440\u043a\u043e\u0432 \u043d\u0430\u0438\u043c\u0435\u043d\u044c\u0448\u0435\u0439 \u0435\u0434\u0438\u043d\u0438\u0446\u0435\u0439 \u0434\u043b\u044f \u0440\u0430\u0431\u043e\u0442\u044b \u0441 \u0442\u0430\u0431\u043b\u0438\u0446\u0430\u043c\u0438 \u044f\u0432\u043b\u044f\u0435\u0442\u0441\u044f \u0441\u0442\u0440\u043e\u043a\u0430 (<a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-stable\/api\/java\/org\/apache\/flink\/types\/Row.html\">Row<\/a>). \u041d\u0443\u0436\u0435\u043d \u0441\u043f\u043e\u0441\u043e\u0431 \u043f\u0440\u0435\u043e\u0431\u0440\u0430\u0437\u043e\u0432\u0430\u043d\u0438\u044f \u0432\u0441\u0435\u0445 \u043d\u0430\u0448\u0438\u0445 \u0441\u0443\u0449\u043d\u043e\u0441\u0442\u0435\u0439 \u0432 \u0442\u0438\u043f <em>Row<\/em>, \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0438\u043c \u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044f\u043c\u0438 <em>Scala<\/em>, \u043d\u0430 \u044d\u0442\u043e\u0442 \u0440\u0430\u0437 \u0442\u0435\u043c \u0447\u0442\u043e <em>Scala<\/em> \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u0435\u0442 \u043f\u0438\u0441\u0430\u0442\u044c \u043a\u043e\u0434 \u0432 <a href=\"https:\/\/ru.wikipedia.org\/wiki\/%D0%A4%D1%83%D0%BD%D0%BA%D1%86%D0%B8%D0%BE%D0%BD%D0%B0%D0%BB%D1%8C%D0%BD%D0%BE%D0%B5_%D0%BF%D1%80%D0%BE%D0%B3%D1%80%D0%B0%D0%BC%D0%BC%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D0%B5\">\u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u043e\u043c \u0441\u0442\u0438\u043b\u0435<\/a>, \u0430 \u043a\u043e\u043d\u043a\u0440\u0435\u0442\u043d\u043e \u043c\u044b \u0432\u043e\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c\u0441\u044f <a href=\"https:\/\/habr.com\/ru\/companies\/tinkoff\/articles\/147759\/\">Type Class<\/a>-\u043c\u0438. \u041d\u0430\u0448 <em>Type Class<\/em> \u0431\u0443\u0434\u0435\u0442 \u0438\u043c\u0435\u0442\u044c \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u0439 \u0432\u0438\u0434:<\/p>\n<pre><code class=\"scala\">sealed trait RowTransformer[A] {   def toRow(entity: A): Option[Row]   protected val fieldsName: Array[String]   implicit def typeInformation: TypeInformation[Row] } <\/code><\/pre>\n<p>\u041f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u0438\u043c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0434\u043b\u044f \u043e\u0434\u043d\u043e\u0439 \u0438\u0437 \u0442\u0430\u0431\u043b\u0438\u0446 (\u0434\u043b\u044f \u043e\u0441\u0442\u0430\u043b\u044c\u043d\u044b\u0445 \u0430\u043b\u0433\u043e\u0440\u0438\u0442\u043c \u0431\u0443\u0434\u0435\u0442 \u0430\u043d\u0430\u043b\u043e\u0433\u0438\u0447\u0435\u043d):<\/p>\n<pre><code class=\"scala\">implicit case object ClientRow extends RowTransformer[Client] {   override def toRow(entity: Client): Option[Row] = {     val kind   = kindOf(entity.operation)     val record = entity.before.orElse(entity.after)     record.map(r => Row.ofKind(kind, Int.box(r.clientId), r.name, r.surname, r.patronymic.orNull, r.sex))   }    override implicit def typeInformation: TypeInformation[Row] = {     Types.ROW_NAMED(fieldsName, Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING)   }    override protected val fieldsName: Array[String] = Array(\"clientId\", \"name\", \"surname\", \"patronymic\", \"sex\") } <\/code><\/pre>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043c\u044b \u043c\u043e\u0436\u0435\u043c \u043f\u0440\u0435\u043e\u0431\u0440\u0430\u0437\u043e\u0432\u044b\u0432\u0430\u0442\u044c <code>DataStream<\/code> \u0432 <code>Table<\/code> \u043e\u0434\u043d\u0438\u043c \u0434\u0435\u0439\u0441\u0442\u0432\u0438\u0435\u043c:<\/p>\n<pre><code class=\"scala\">val clients: Table = KafkaConsumerSource.configureKafkaDataSource[Client](\"clients\")   .toStreamTable(\"clientId\") <\/code><\/pre>\n<p>\u042d\u0442\u043e \u0434\u0430\u043b\u0435\u043a\u043e \u043d\u0435 \u0435\u0434\u0438\u043d\u0441\u0442\u0432\u0435\u043d\u043d\u044b\u0439 \u0441\u043f\u043e\u0441\u043e\u0431 \u0441\u043e\u0437\u0434\u0430\u043d\u0438\u044f \u0442\u0430\u0431\u043b\u0438\u0446, \u0435\u0441\u043b\u0438 \u043c\u044b \u0438\u0437\u043d\u0430\u0447\u0430\u043b\u044c\u043d\u043e \u0445\u043e\u0442\u0438\u043c \u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c \u0441 \u0434\u0430\u043d\u043d\u044b\u043c\u0438 \u043a\u0430\u043a \u0441 \u0442\u0430\u0431\u043b\u0438\u0446\u0435\u0439, \u043d\u0435\u043e\u0431\u044f\u0437\u0430\u0442\u0435\u043b\u044c\u043d\u043e \u0441\u043f\u0435\u0440\u0432\u0430 \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u044f\u0442\u044c \u0434\u0430\u043d\u043d\u044b\u0435 \u0432 \u0432\u0438\u0434\u0435 <em>DataStream<\/em>:<\/p>\n<pre><code class=\"scala\">tableEnv.createTable(   \"companies\",   TableDescriptor     .forConnector(\"kafka\")     .schema(       Schema         .newBuilder()         .column(\"clientId\", DataTypes.INT().notNull())         .column(\"companyId\", DataTypes.INT().notNull())         .column(\"companyName\", DataTypes.STRING().notNull())         .primaryKey(\"clientId\")         .build()     )     .option(KafkaConnectorOptions.TOPIC, Seq(\"companies\").asJava)     .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, Kafka.BootstrapServers)     .option(KafkaConnectorOptions.PROPS_GROUP_ID, \"companies-group\")     .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET)     .option(KafkaConnectorOptions.VALUE_FORMAT, \"debezium-json\")     .build() )  val companies: Table = tableEnv.sqlQuery(\"SELECT * FROM companies\")  companies.print(\"Companies\") <\/code><\/pre>\n<p>\u0418 \u043a\u043e\u043d\u0435\u0447\u043d\u043e \u0436\u0435 \u043a\u0443\u0434\u0430 \u0431\u0435\u0437 <em>SQL<\/em>:<\/p>\n<pre><code class=\"scala\">tableEnv.executeSql(   \"\"\"     |CREATE TABLE payments (clientId INT NOT NULL, amount INT NOT NULL, tmMs BIGINT NOT NULL)     |WITH (     | 'connector' = 'kafka',     | 'topic' = 'payments',     | 'properties.bootstrap.servers' = 'localhost:9092',     | 'properties.group.id' = 'payments-group',     | 'scan.startup.mode' = 'earliest-offset',     | 'format' = 'debezium-json'     |)     |\"\"\".stripMargin)  val payments: Table = tableEnv.sqlQuery(\"SELECT * FROM payments\")  payments.print(\"Payments\") <\/code><\/pre>\n<p>\u041c\u044b \u043e\u043a\u043e\u043d\u0447\u0430\u0442\u0435\u043b\u044c\u043d\u043e \u043f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u0438\u043b\u0438\u0441\u044c \u043a \u043d\u0430\u043f\u0438\u0441\u0430\u043d\u0438\u044e \u043e\u0441\u043d\u043e\u0432\u043d\u043e\u0439 \u043b\u043e\u0433\u0438\u043a\u0438. \u0414\u043b\u044f \u043d\u0430\u0448\u0435\u0439 \u0437\u0430\u0434\u0430\u0447\u0438 \u0431\u0443\u0434\u0435\u043c \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c <em>inner join<\/em>, \u0442.\u0435. \u043f\u0440\u0438 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0438 \u043d\u043e\u0432\u043e\u0433\u043e \u0441\u043e\u0431\u044b\u0442\u0438\u044f \u043c\u044b \u0431\u0443\u0434\u0435\u043c \u0432\u044b\u0434\u0430\u0432\u0430\u0442\u044c \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442 \u0432\u043e <em>\u0432\u043d\u0435\u0448\u043d\u044e\u044e \u0441\u0438\u0441\u0442\u0435\u043c\u0443<\/em> \u0442\u043e\u043b\u044c\u043a\u043e \u0432 \u0441\u043b\u0443\u0447\u0430\u0435 \u043d\u0430\u043b\u0438\u0447\u0438\u044f \u0434\u0430\u043d\u043d\u044b\u0445 \u043f\u043e \u043a\u043b\u044e\u0447\u0443 \u0432\u043e \u0432\u0441\u0435\u0445 \u0442\u0430\u0431\u043b\u0438\u0446\u0430\u0445.<\/p>\n<pre><code class=\"scala\">clients   .join(companies, $\"client.clientId\" === $\"company.clientId\")   .join(payments, $\"client.clientId\" === $\"payment.clientId\")   .select(\"client.clientId\", \"name\", \"surname\", \"companyId\", \"companyName\", \"amount\", \"timestamp\")   .toChangelogStream(Schema.derived(), ChangelogMode.upsert())   .print(\"Portfolio\") <\/code><\/pre>\n<figure class=\"\"><img decoding=\"async\" src=\"https:\/\/habrastorage.org\/r\/w1560\/getpro\/habr\/post_images\/280\/ac5\/c75\/280ac5c75b53e2c91140464b4a300ef5.png\" alt=\"\u0418\u0437\u043e\u0431\u0440\u0430\u0436\u0435\u043d\u0438\u0435\" title=\"\u0420\u0438\u0441\u0443\u043d\u043e\u043a 2 - Dynamic Tables\" data-src=\"https:\/\/habrastorage.org\/getpro\/habr\/post_images\/280\/ac5\/c75\/280ac5c75b53e2c91140464b4a300ef5.png\"\/><\/p>\n<div><figcaption>\u0420\u0438\u0441\u0443\u043d\u043e\u043a 2 &#8212; Dynamic Tables<\/figcaption><\/div>\n<\/figure>\n<h2>\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435 State Processor API<\/h2>\n<p>\u0421\u043d\u043e\u0432\u0430 \u043d\u0430\u0447\u043d\u0435\u043c \u0441 \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u044f, \u0432 \u044d\u0442\u043e\u043c \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0435 \u043d\u0430\u043c \u0431\u0443\u0434\u0435\u0442 \u0434\u043e\u0441\u0442\u0430\u0442\u043e\u0447\u043d\u043e \u0442\u043e\u043b\u044c\u043a\u043e \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u044f \u0434\u043b\u044f \u0440\u0430\u0431\u043e\u0442\u044b \u0432 \u043f\u043e\u0442\u043e\u043a\u043e\u0432\u043e\u043c \u0440\u0435\u0436\u0438\u043c\u0435:<\/p>\n<pre><code class=\"scala\">implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment <\/code><\/pre>\n<p>\u0412 \u043f\u0440\u043e\u0448\u043b\u043e\u043c \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0435 \u043c\u044b \u043f\u0440\u0438\u0432\u043e\u0434\u0438\u043b\u0438 \u0432\u0441\u0435 \u043d\u0430\u0448\u0438 \u0441\u0443\u0449\u043d\u043e\u0441\u0442\u0438 \u043a \u0442\u0438\u043f\u0443 <em>Table<\/em>, \u0441\u0435\u0439\u0447\u0430\u0441 \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u043e \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u043d\u0435\u0447\u0442\u043e \u043f\u043e\u0445\u043e\u0436\u0435\u0435:<\/p>\n<pre><code class=\"scala\">case class PortfolioState(    id: Int,    client: Option[Client.Value],    company: Option[ClientCompany.Value],    payment: Option[Payment.Value]) {    def complete: Option[Portfolio] = {     for {       client  &lt;- client       company &lt;- company       payment &lt;- payment     } yield {       Portfolio(         client.clientId,         client.name,         client.surname,         company.companyId,         company.companyName,         payment.amount,         payment.timestamp       )     }   } } <\/code><\/pre>\n<p>\u041c\u044b \u043e\u043f\u0438\u0441\u0430\u043b\u0438 \u0441\u0432\u043e\u0439 \u0441\u043e\u0431\u0441\u0442\u0432\u0435\u043d\u043d\u044b\u0439 \u0430\u043d\u0430\u043b\u043e\u0433 \u0442\u0430\u0431\u043b\u0438\u0446\u044b, \u0434\u043e\u0431\u0430\u0432\u0438\u0432 \u0432 \u043d\u0435\u0433\u043e \u043c\u0435\u0442\u043e\u0434 <em>complete<\/em> \u0444\u043e\u0440\u043c\u0438\u0440\u0443\u044e\u0449\u0438\u0439 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442 \u0442\u043e\u043b\u044c\u043a\u043e \u0432 \u0441\u043b\u0443\u0447\u0430\u0435 \u043d\u0430\u043b\u0438\u0447\u0438\u044f \u0434\u0430\u043d\u043d\u044b\u0445 \u043f\u043e \u0432\u0441\u0435\u043c \u0442\u0440\u0435\u043c \u0441\u0443\u0449\u043d\u043e\u0441\u0442\u044f\u043c (\u0430\u043d\u0430\u043b\u043e\u0433 <em>inner join<\/em>). \u041e\u043f\u0438\u0448\u0435\u043c \u0441\u043f\u043e\u0441\u043e\u0431 \u0442\u0440\u0430\u043d\u0441\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u0438 \u0438\u0441\u0445\u043e\u0434\u043d\u043e\u0439 \u0441\u0443\u0449\u043d\u043e\u0441\u0442\u0438 \u0432 \u043d\u0430\u0448 \u0442\u0438\u043f (\u0434\u043b\u044f \u043e\u0441\u0442\u0430\u043b\u044c\u043d\u044b\u0445 \u0441\u0443\u0449\u043d\u043e\u0441\u0442\u0435\u0439 \u0431\u0443\u0434\u0435\u0442 \u0430\u043d\u0430\u043b\u043e\u0433\u0438\u0447\u043d\u043e):<\/p>\n<pre><code class=\"scala\">object PortfolioState {    def apply(client: Client): PortfolioState = {     val id = client.before.map(_.clientId).getOrElse(client.after.map(_.clientId).get)     PortfolioState(id, client.before.orElse(client.after), None, None)   } } <\/code><\/pre>\n<p>\u0418\u0442\u043e\u0433\u043e\u0432\u044b\u0439 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442:<\/p>\n<pre><code class=\"scala\">val clients: DataStream[PortfolioState] = KafkaConsumerSource.configureKafkaDataSource[Client](\"clients\").map(PortfolioState(_)) <\/code><\/pre>\n<p>\u041f\u043e\u0441\u043b\u0435\u0434\u043d\u0435\u0435 \u0447\u0435\u0433\u043e \u043d\u0430\u043c \u043d\u0435 \u0445\u0432\u0430\u0442\u0430\u0435\u0442 &#8212; \u044d\u0442\u043e <a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-stable\/docs\/libs\/state_processor_api\/#keyed-state\">\u043e\u0431\u0440\u0430\u0431\u043e\u0442\u0447\u0438\u043a\u0430<\/a> \u0441\u043e\u0431\u044b\u0442\u0438\u0439, \u043a\u043e\u0442\u043e\u0440\u044b\u0439 \u0431\u0443\u0434\u0435\u0442 \u043f\u043e\u0434\u0434\u0435\u0440\u0436\u0438\u0432\u0430\u0442\u044c \u0430\u043a\u0442\u0443\u0430\u043b\u044c\u043d\u043e\u0435 \u0441\u043e\u0441\u0442\u043e\u044f\u043d\u0438\u0435 \u0438 \u0432\u044b\u0434\u0430\u0432\u0430\u0442\u044c \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442\u044b:<\/p>\n<pre><code class=\"scala\">class StateProcessor extends KeyedProcessFunction[Int, PortfolioState, Portfolio] {    private var state: ValueState[PortfolioState] = _    override def open(parameters: Configuration): Unit = {     state = getRuntimeContext.getState(       new ValueStateDescriptor[PortfolioState](\"state\", createTypeInformation[PortfolioState])     )   }    override def processElement(      value: PortfolioState,      ctx: KeyedProcessFunction[Int, PortfolioState, Portfolio]#Context,      ut: Collector[Portfolio]   ): Unit = {     Option(state.value()).fold {       state.update(value) \/\/ \u0435\u0441\u043b\u0438 state == null, \u0438\u043d\u0438\u0446\u0438\u0430\u043b\u0438\u0437\u0438\u0440\u0443\u0435\u043c \u0435\u0433\u043e \u0432\u0445\u043e\u0434\u043d\u044b\u043c \u0437\u043d\u0430\u0447\u0435\u043d\u0438\u0435\u043c     } { currentState =>       val updatedState = currentState.copy(         client = value.client.orElse(currentState.client),         company = value.company.orElse(currentState.company),         payment = value.payment.orElse(currentState.payment)       )       updatedState.complete.foreach(out.collect) \/\/ \u0435\u0441\u043b\u0438 \u0434\u0430\u043d\u043d\u044b\u0445 \u0445\u0432\u0430\u0442\u0430\u0435\u0442, \u0442\u043e \u0432\u044b\u0434\u0430\u0435\u043c \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442       state.update(updatedState)     }   } } <\/code><\/pre>\n<p>\u0421\u043e\u0431\u0435\u0440\u0435\u043c \u0432\u0441\u0435 \u0432\u043c\u0435\u0441\u0442\u0435:<\/p>\n<pre><code class=\"scala\">clients   .union(companies) \/\/ \u043e\u0431\u044a\u0435\u0434\u0438\u043d\u044f\u0435\u043c \u0432\u0441\u0435 \u0442\u0440\u0438 \u043f\u043e\u0442\u043e\u043a\u0430 \u0432 \u043e\u0434\u0438\u043d   .union(payments)   .keyBy(_.id) \/\/ \u0448\u0430\u0440\u0434\u0438\u0443\u0440\u0435\u043c \u0434\u0430\u043d\u043d\u044b\u0435 \u043f\u043e \u043f\u043e\u043b\u044e id (clientId)   .process(new PortfolioStateProcessor())   .print(\"Portfolio\") <\/code><\/pre>\n<h2>\u0418\u0442\u043e\u0433\u0438<\/h2>\n<p>\u041c\u044b \u0440\u0430\u0441\u0441\u043c\u043e\u0442\u0440\u0435\u043b\u0438 2 \u0440\u0430\u0437\u043d\u044b\u0445 \u043f\u043e\u0434\u0445\u043e\u0434\u0430, \u0434\u0430\u0436\u0435 \u043d\u0435\u0441\u043c\u043e\u0442\u0440\u044f \u043d\u0430 \u0432\u0441\u044e \u043f\u0440\u043e\u0441\u0442\u043e\u0442\u0443 \u043f\u043e\u0441\u0442\u0430\u043d\u043e\u0432\u043a\u0438 \u043f\u043e\u0434\u0445\u043e\u0434 \u0441 \u043f\u0440\u0438\u043c\u0435\u043d\u0435\u043d\u0438\u0435\u043c <em>Table API<\/em> \u0434\u043b\u044f \u0437\u0430\u0434\u0430\u0447 \u043e\u0442\u0441\u043b\u0435\u0436\u0438\u0432\u0430\u043d\u0438\u044f \u0438\u0437\u043c\u0435\u043d\u0435\u043d\u0438\u0439 \u0438 \u043c\u043e\u043c\u0435\u043d\u0442\u0430\u043b\u044c\u043d\u043e\u0439 \u0432\u044b\u0434\u0430\u0447\u0438 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0438\u0442 \u043f\u0440\u0438\u0432\u043b\u0435\u043a\u0430\u0442\u0435\u043b\u044c\u043d\u0435\u0435, \u043c\u044b \u043f\u043e\u043b\u0443\u0447\u0430\u0435\u043c \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044c \u043e\u043f\u0438\u0441\u044b\u0432\u0430\u0442\u044c \u0436\u0435\u043b\u0430\u0435\u043c\u044b\u0439 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442 \u043d\u0430 \u043f\u0440\u0438\u0432\u044b\u0447\u043d\u043e\u043c <em>SQL<\/em>-\u043f\u043e\u0434\u043e\u0431\u043d\u043e\u043c \u0441\u0438\u043d\u0442\u0430\u043a\u0441\u0438\u0441\u0435 \u043d\u0430\u0445\u043e\u0434\u044f\u0441\u044c \u0432 \u0431\u0435\u0441\u043a\u043e\u043d\u0435\u0447\u043d\u043e\u043c \u043f\u043e\u0442\u043e\u043a\u0435 \u0441\u043e\u0431\u044b\u0442\u0438\u0439.<\/p>\n<p>\u041f\u043e\u043b\u043d\u044b\u0439 \u043a\u043e\u0434 \u043f\u0440\u043e\u0435\u043a\u0442\u0430 \u043c\u043e\u0436\u043d\u043e \u043d\u0430\u0439\u0442\u0438 <a href=\"https:\/\/github.com\/way-tendoo\/flink-table-api-habr-paper\">\u0437\u0434\u0435\u0441\u044c<\/a><\/p>\n<\/div>\n<\/div>\n<\/div>\n<p> <!----> <!----><\/div>\n<p> <!----> <!----><br \/> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/companies\/neoflex\/articles\/745730\/\"> https:\/\/habr.com\/ru\/companies\/neoflex\/articles\/745730\/<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<div><\/div>\n<div id=\"post-content-body\">\n<div>\n<div class=\"article-formatted-body article-formatted-body article-formatted-body_version-2\">\n<div xmlns=\"http:\/\/www.w3.org\/1999\/xhtml\">\n<figure class=\"\"><\/figure>\n<p>\u0412\u0441\u0435\u043c \u043f\u0440\u0438\u0432\u0435\u0442!<\/p>\n<p>\u0412 \u044d\u0442\u043e\u0439 \u0441\u0442\u0430\u0442\u044c\u0435 \u043c\u044b \u043f\u043e\u0441\u043c\u043e\u0442\u0440\u0438\u043c \u043d\u0430 \u0434\u0432\u0430 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0430 \u0440\u0435\u0448\u0435\u043d\u0438\u044f &#171;\u0442\u0438\u043f\u0438\u0447\u043d\u043e\u0439&#187; \u0437\u0430\u0434\u0430\u0447\u0438 \u043f\u043e\u0442\u043e\u043a\u043e\u0432\u043e\u0439 \u043e\u0431\u0440\u0430\u0431\u043e\u0442\u043a\u0438 \u0441 \u0443\u0447\u0435\u0442\u043e\u043c \u0441\u043e\u0441\u0442\u043e\u044f\u043d\u0438\u044f \u043f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 \u0444\u0440\u0435\u0439\u043c\u0432\u043e\u0440\u043a\u0430 <em>Apache Flink<\/em>.<\/p>\n<p>\u0415\u0441\u043b\u0438 \u0432\u044b \u0442\u043e\u043b\u044c\u043a\u043e \u043d\u0430\u0447\u0438\u043d\u0430\u0435\u0442\u0435 \u0437\u043d\u0430\u043a\u043e\u043c\u0441\u0442\u0432\u043e \u0441 <em>Flink<\/em>, <a href=\"https:\/\/www.youtube.com\/@flinkforward\">\u0437\u0434\u0435\u0441\u044c<\/a> \u043c\u043e\u0436\u043d\u043e \u043d\u0430\u0439\u0442\u0438 \u043e\u0431\u0438\u043b\u044c\u043d\u043e\u0435 \u043a\u043e\u043b\u0438\u0447\u0435\u0441\u0442\u0432\u043e \u043c\u0430\u0442\u0435\u0440\u0438\u0430\u043b\u043e\u0432.<\/p>\n<h2>\u041f\u043e\u0441\u0442\u0430\u043d\u043e\u0432\u043a\u0430 \u0437\u0430\u0434\u0430\u0447\u0438<\/h2>\n<p>\u0418\u0442\u0430\u043a, \u0434\u0430\u0432\u0430\u0439\u0442\u0435 \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u0438\u043c \u0431\u0430\u0437\u0443 \u0434\u0430\u043d\u043d\u044b\u0445 (\u043f\u0443\u0441\u0442\u044c \u044d\u0442\u043e \u0431\u0443\u0434\u0435\u0442 <em>PostgreSQL<\/em>) \u0441\u043e\u0441\u0442\u043e\u044f\u0449\u0443\u044e \u0438\u0437 \u0442\u0440\u0435\u0445 \u0442\u0430\u0431\u043b\u0438\u0446:<\/p>\n<ol>\n<li>\n<p><em>Clients<\/em> &#8212; \u043e\u0431\u0449\u0430\u044f \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044f \u043f\u043e \u043a\u043b\u0438\u0435\u043d\u0442\u0443:<\/p>\n<\/li>\n<\/ol>\n<div>\n<div class=\"table\">\n<table>\n<tbody>\n<tr>\n<th>\n<p align=\"left\">\u0418\u043c\u044f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u0422\u0438\u043f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435<\/p>\n<\/th>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">clientId<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0443\u043d\u0438\u043a\u0430\u043b\u044c\u043d\u044b\u0439 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">name<\/p>\n<\/td>\n<td>\n<p align=\"left\">string<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0438\u043c\u044f \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">surname<\/p>\n<\/td>\n<td>\n<p align=\"left\">string<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0444\u0430\u043c\u0438\u043b\u0438\u044f \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">patronymic<\/p>\n<\/td>\n<td>\n<p align=\"left\">optional[string]<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u043e\u0442\u0447\u0435\u0441\u0442\u0432\u043e \u043a\u043b\u0438\u0435\u043d\u0442\u0430 (\u043d\u0435\u043e\u0431\u044f\u0437\u0430\u0442\u0435\u043b\u044c\u043d\u043e\u0435 \u043f\u043e\u043b\u0435)<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">sex<\/p>\n<\/td>\n<td>\n<p align=\"left\">string<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u043f\u043e\u043b<\/p>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<ol start=\"2\">\n<li>\n<p><em>ClientCompany<\/em> &#8212; \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044f \u043e \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u0438 \u043a\u043b\u0438\u0435\u043d\u0442\u0430:<\/p>\n<\/li>\n<\/ol>\n<div>\n<div class=\"table\">\n<table>\n<tbody>\n<tr>\n<th>\n<p align=\"left\">\u0418\u043c\u044f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u0422\u0438\u043f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435<\/p>\n<\/th>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">clientId<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0443\u043d\u0438\u043a\u0430\u043b\u044c\u043d\u044b\u0439 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">companyId<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0443\u043d\u0438\u043a\u0430\u043b\u044c\u043d\u044b\u0439 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u0438<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">companyName<\/p>\n<\/td>\n<td>\n<p align=\"left\">string<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u043d\u0430\u0437\u0432\u0430\u043d\u0438\u0435 \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u0438<\/p>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<ol start=\"3\">\n<li>\n<p><em>Payment<\/em> &#8212; \u0438\u043d\u0444\u043e\u0440\u043c\u0430\u0446\u0438\u044f \u043e \u043f\u043b\u0430\u0442\u0435\u0436\u0430\u0445 \u043a\u043b\u0438\u0435\u043d\u0442\u0430:<\/p>\n<\/li>\n<\/ol>\n<div>\n<div class=\"table\">\n<table>\n<tbody>\n<tr>\n<th>\n<p align=\"left\">\u0418\u043c\u044f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u0422\u0438\u043f \u043f\u043e\u043b\u044f<\/p>\n<\/th>\n<th>\n<p align=\"left\">\u041e\u043f\u0438\u0441\u0430\u043d\u0438\u0435<\/p>\n<\/th>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">clientId<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0443\u043d\u0438\u043a\u0430\u043b\u044c\u043d\u044b\u0439 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u043a\u043b\u0438\u0435\u043d\u0442\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">amount<\/p>\n<\/td>\n<td>\n<p align=\"left\">int<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0441\u0443\u043c\u043c\u0430 \u043f\u043b\u0430\u0442\u0435\u0436\u0430<\/p>\n<\/td>\n<\/tr>\n<tr>\n<td>\n<p align=\"left\">tmMs<\/p>\n<\/td>\n<td>\n<p align=\"left\">long<\/p>\n<\/td>\n<td>\n<p align=\"left\">\u0432\u0440\u0435\u043c\u044f \u0441\u0432\u0435\u0440\u0448\u0435\u043d\u0438\u044f \u043f\u043b\u0430\u0442\u0435\u0436\u0430<\/p>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>\u0411\u0443\u0434\u0435\u043c \u043f\u043e\u043b\u0443\u0447\u0430\u0442\u044c \u043e\u0431\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u043f\u043e \u0432\u0441\u0435\u043c \u0442\u0430\u0431\u043b\u0438\u0446\u0430\u043c \u0447\u0435\u0440\u0435\u0437 <a href=\"https:\/\/ru.wikipedia.org\/wiki\/%D0%97%D0%B0%D1%85%D0%B2%D0%B0%D1%82_%D0%B8%D0%B7%D0%BC%D0%B5%D0%BD%D0%B5%D0%BD%D0%B8%D1%8F_%D0%B4%D0%B0%D0%BD%D0%BD%D1%8B%D1%85\">CDC<\/a>-\u043a\u0430\u043d\u0430\u043b (\u043f\u0443\u0441\u0442\u044c \u044d\u0442\u043e \u0431\u0443\u0434\u0435\u0442 <a href=\"https:\/\/debezium.io\/\">Debezium<\/a>) \u0432 \u0442\u043e\u043f\u0438\u043a <a href=\"https:\/\/kafka.apache.org\/\">Kafka<\/a> \u0432 \u0444\u043e\u0440\u043c\u0430\u0442\u0435 <a href=\"https:\/\/www.json.org\/json-en.html\">Json<\/a>. \u0417\u0430\u0434\u0430\u0447\u0435\u0439 \u043d\u0430\u0448\u0435\u0439 <em>Job<\/em>-\u044b \u0431\u0443\u0434\u0435\u0442 \u0441\u043b\u0443\u0448\u0430\u0442\u044c \u0442\u043e\u043f\u0438\u043a\u0438 \u0438 \u0432\u044b\u0434\u0430\u0432\u0430\u0442\u044c \u043e\u0431\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u044f \u043f\u043e \u043a\u043b\u0438\u0435\u043d\u0442\u0443 (\u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440: \u043a\u043b\u0438\u0435\u043d\u0442 \u0441\u043c\u0435\u043d\u0438\u043b \u043a\u043e\u043c\u043f\u0430\u043d\u0438\u044e) \u0432\u043e \u0432\u043d\u0435\u0448\u043d\u044e\u044e \u0441\u0438\u0441\u0442\u0435\u043c\u0443 (\u0432 \u0440\u0430\u043c\u043a\u0430\u0445 \u0441\u0442\u0430\u0442\u044c\u0438 \u0432 \u043a\u0430\u0447\u0435\u0441\u0442\u0432\u0435 \u0432\u043d\u0435\u0448\u043d\u0435\u0439 \u0441\u0438\u0441\u0442\u0435\u043c\u044b \u0431\u0443\u0434\u0435\u043c \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u043a\u043e\u043d\u0441\u043e\u043b\u044c \u0432\u044b\u0432\u043e\u0434\u0430).<\/p>\n<figure class=\"\">\n<div><figcaption>\u0420\u0438\u0441\u0443\u043d\u043e\u043a 1 &#8212; \u0421\u0445\u0435\u043c\u0430 \u0431\u0438\u0437\u043d\u0435\u0441\u0441 \u043f\u0440\u043e\u0446\u0435\u0441\u0441\u0430<\/figcaption><\/div>\n<\/figure>\n<h2>\u041d\u0430\u0447\u0438\u043d\u0430\u0435\u043c \u043f\u0440\u043e\u0433\u0440\u0430\u043c\u043c\u0438\u0440\u043e\u0432\u0430\u0442\u044c<\/h2>\n<p>\u041d\u0430\u0448 \u0442\u0435\u0445\u043d\u043e\u043b\u043e\u0433\u0438\u0447\u0435\u0441\u043a\u0438\u0439 \u0441\u0442\u0435\u043a:<\/p>\n<ol>\n<li>\n<p><a href=\"https:\/\/docs.scala-lang.org\/tour\/tour-of-scala.html\">Scala<\/a> v2.12.10<\/p>\n<\/li>\n<li>\n<p><a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-stable\/\">Flink<\/a> v1.16.0<\/p>\n<\/li>\n<li>\n<p><a href=\"https:\/\/circe.github.io\/circe\/\">Circe<\/a> v0.14.3<\/p>\n<\/li>\n<\/ol>\n<p>\u041c\u044b \u0440\u0430\u0437\u0431\u0435\u0440\u0435\u043c 2 \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0430 \u0440\u0435\u0448\u0435\u043d\u0438\u044f:<\/p>\n<ol>\n<li>\n<p>\u041f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 <a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-release-1.16\/docs\/dev\/table\/overview\/\">Table API<\/a><\/p>\n<\/li>\n<li>\n<p>\u041f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 \u043a\u043b\u0430\u0441\u0441\u0438\u0447\u0435\u0441\u043a\u043e\u0433\u043e <a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-release-1.16\/docs\/libs\/state_processor_api\/#state-processor-api\">State Processor API<\/a><\/p>\n<\/li>\n<\/ol>\n<p>\u041f\u0440\u0435\u0436\u0434\u0435 \u0447\u0435\u043c \u043f\u0438\u0441\u0430\u0442\u044c \u043e\u0441\u043d\u043e\u0432\u043d\u0443\u044e \u043b\u043e\u0433\u0438\u043a\u0443 \u043d\u0430\u043c \u043f\u043e\u0442\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u0442\u044c \u043d\u0435\u0441\u043a\u043e\u043b\u044c\u043a\u043e \u043e\u0431\u0449\u0438\u0445 \u0448\u0430\u0433\u043e\u0432, \u043a\u043e\u0442\u043e\u0440\u044b\u043c\u0438 \u043c\u044b \u0431\u0443\u0434\u0435\u043c \u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u0432 \u043e\u0431\u0435\u0438\u0445 \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f\u0445.<\/p>\n<p>\u041f\u0435\u0440\u0432\u043e\u0435 \u0447\u0442\u043e \u043d\u0430\u043c \u043f\u043e\u0442\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f &#8212; \u044d\u0442\u043e \u043d\u0430\u0443\u0447\u0438\u0442\u044c\u0441\u044f \u0447\u0438\u0442\u0430\u0442\u044c \u0442\u043e\u043f\u0438\u043a\u0438 <em>Kafka<\/em>, \u0432 \u044d\u0442\u043e\u043c \u043d\u0430\u043c \u043f\u043e\u043c\u043e\u0436\u0435\u0442 \u0443\u0436\u0435 \u0433\u043e\u0442\u043e\u0432\u044b\u0439 <a href=\"https:\/\/alpinegizmo.github.io\/flink-docs\/dev\/connectors\/kafka.html\">Flink Kafka Connector<\/a>:<\/p>\n<pre><code class=\"scala\">sealed trait KafkaConsumerSource {    def configure[A: Decoder: ClassTag](bootstrapServers: String, topicName: String): KafkaSource[A] = {     KafkaSource       .builder()       .setBootstrapServers(bootstrapServers)       .setGroupId(s\"$topicName-group\")       .setTopics(topicName)       .setDeserializer(new KafkaJsonRecordDeserializer[A])       .setStartingOffsets(OffsetsInitializer.earliest())       .build()   } }  object KafkaConsumerSource extends KafkaConsumerSource {    def configureKafkaDataSource[A: Decoder: ClassTag](       topicName: String   )(implicit env: StreamExecutionEnvironment,     typeInformation: TypeInformation[A]): DataStream[A] = {     val kafkaSource = KafkaConsumerSource.configure[A](Kafka.BootstrapServers, topicName)     env.fromSource[A](kafkaSource, WatermarkStrategy.noWatermarks[A](), s\"$topicName-flow\")   } }  class KafkaJsonRecordDeserializer[A: Decoder: ClassTag] extends KafkaRecordDeserializationSchema[A] {    override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[A]): Unit = {     val value = new String(record.value())     val obj   = Serde.toObj(value).toOption \/\/ \u0432 \u0441\u043b\u0443\u0447\u0430\u0435 \u043d\u0435\u0443\u0434\u0430\u0447\u0438 \u043f\u0440\u0438 \u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438, \u043e\u0448\u0438\u0431\u043a\u0430 \u043f\u0440\u043e\u0441\u0442\u043e \u0438\u0433\u043d\u043e\u0440\u0438\u0440\u0443\u0435\u0442\u0441\u044f \u0438 \u0441\u043e\u0431\u044b\u0442\u0438\u0435 \u043e\u0442\u0431\u0440\u0430\u0441\u044b\u0432\u0430\u0435\u0442\u0441\u044f (\u0432 \u0440\u0435\u0430\u043b\u044c\u043d\u044b\u0445 \u043f\u0440\u043e\u0435\u043a\u0442\u0430\u0445 \u0442\u0430\u043a \u0434\u0435\u043b\u0430\u0442\u044c \u043d\u0435 \u0441\u0442\u043e\u0438\u0442)     obj.foreach(out.collect)   }    override def getProducedType: TypeInformation[A] = {     TypeExtractor       .getForClass(classTag[A].runtimeClass.asInstanceOf[Class[A]])       .asInstanceOf[TypeInformation[A]]   } } <\/code><\/pre>\n<p>\u0412 \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438\/\u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0439 \u043d\u0430\u043c \u043f\u043e\u043c\u043e\u0436\u0435\u0442 <em>Circe<\/em>:<\/p>\n<pre><code class=\"scala\">object Serde {    def toObj[A: Decoder](json: String): Either[circe.Error, A] = decode[A](json)    def toJson[A: Encoder](obj: A): String = obj.asJson.toString() } <\/code><\/pre>\n<p>\u041c\u044b \u043d\u0435\u043c\u043d\u043e\u0433\u043e \u0443\u043f\u0440\u043e\u0441\u0442\u0438\u043c \u0441\u0435\u0431\u0435 \u0437\u0430\u0434\u0430\u0447\u0443, \u0441\u044b\u043c\u0438\u0442\u0438\u0440\u0443\u0435\u043c \u043f\u043e\u0432\u0435\u0434\u0435\u043d\u0438\u0435 <em>CDC<\/em>-\u043a\u0430\u043d\u0430\u043b\u0430, \u043e\u0442\u043f\u0440\u0430\u0432\u043b\u044f\u044f \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f \u0442\u0440\u0435\u0431\u0443\u0435\u043c\u043e\u0433\u043e \u0444\u043e\u0440\u043c\u0430\u0442\u0430 \u043d\u0430\u043f\u0440\u044f\u043c\u0443\u044e \u0432 \u0442\u043e\u043f\u0438\u043a <em>Kafka<\/em>:<\/p>\n<pre><code class=\"scala\">case class Client (before: Option[Client.Value], after: Option[Client.Value], op: String)  object Client {   case class Value(clientId: Int, name: String, surname: String, patronymic: Option[String], sex: String) }  case class ClientCompany (before: Option[ClientCompany.Value], after: Option[ClientCompany.Value], op: String)  object ClientCompany {   case class Value(clientId: Int, companyId: Int, companyName: String) }  case class Payment (before: Option[Payment.Value], after: Option[Payment.Value], op: String)  object Payment {   case class Value(clientId: Int, amount: Int, tmMs: Long) }  <\/code><\/pre>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043c\u044b \u0433\u043e\u0442\u043e\u0432\u044b \u043f\u0440\u0438\u0441\u0442\u0443\u043f\u0438\u0442\u044c \u043a \u043e\u0441\u043d\u043e\u0432\u043d\u043e\u0439 \u0447\u0430\u0441\u0442\u0438.<\/p>\n<h2>\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435 Flink Table API<\/h2>\n<p>\u0418\u0442\u0430\u043a, \u043f\u0440\u0435\u0436\u0434\u0435 \u0432\u0441\u0435\u0433\u043e \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u043e \u043f\u043e\u043b\u0443\u0447\u0438\u0442\u044c \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e\u0435 \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u0435, \u043f\u043e\u0441\u043a\u043e\u043b\u044c\u043a\u0443 \u043c\u044b \u0431\u0443\u0434\u0435\u043c \u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c \u0432 \u043f\u043e\u0442\u043e\u043a\u043e\u0432\u043e\u043c \u0440\u0435\u0436\u0438\u043c\u0435, \u0437\u0430\u0440\u0443\u0447\u0438\u043c\u0441\u044f \u0442\u0430\u043a\u043e\u0432\u044b\u043c:<\/p>\n<pre><code class=\"scala\">implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment <\/code><\/pre>\n<p>\u0414\u0430\u043b\u0435\u0435 \u043f\u043e\u043b\u0443\u0447\u0438\u043c \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u0435 \u0434\u043b\u044f \u0434\u043e\u0441\u0442\u0443\u043f\u0430 \u043a <em>Table API<\/em> :<\/p>\n<pre><code class=\"scala\">implicit val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) <\/code><\/pre>\n<p>\u0424\u043e\u0440\u043c\u0438\u0440\u0443\u0435\u043c <em>Kafka Source<\/em> \u043f\u0440\u0438 \u043f\u043e\u043c\u043e\u0449\u0438 <a href=\"#%D0%9D%D0%B0%D1%87%D0%B8%D0%BD%D0%B0%D0%B5%D0%BC-%D0%BF%D1%80%D0%BE%D0%B3%D1%80%D0%B0%D0%BC%D0%BC%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D1%82%D1%8C\">\u043a\u043e\u0434\u0430<\/a> \u043a\u043e\u0442\u043e\u0440\u044b\u0439 \u043c\u044b \u043f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u0438\u043b\u0438 \u0432\u044b\u0448\u0435:<\/p>\n<pre><code class=\"scala\">val clients: DataStream[Client] = KafkaConsumerSource.configureKafkaDataSource[Client](\"clients\") <\/code><\/pre>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043d\u0430\u043c \u043e\u0441\u0442\u0430\u043b\u0441\u044f \u0432\u0441\u0435\u0433\u043e \u043e\u0434\u0438\u043d \u0448\u0430\u0433 \u0434\u043e \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u044f \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u0435\u043d\u0438\u044f \u0441 \u043a\u043e\u0442\u043e\u0440\u044b\u043c \u043d\u0430\u043c \u043f\u0440\u0435\u0434\u0441\u0442\u043e\u0438\u0442 \u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c &#8212; <a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-release-1.17\/docs\/dev\/table\/concepts\/dynamic_tables\/\">Dynamic Tables<\/a>.  \u0427\u0442\u043e\u0431\u044b \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u044d\u0442\u043e\u0442 \u0448\u0430\u0433 \u043c\u0430\u043a\u0441\u0438\u043c\u0430\u043b\u044c\u043d\u043e \u043f\u0440\u043e\u0441\u0442\u044b\u043c, \u0432\u043e\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c\u0441\u044f \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044f\u043c\u0438 <a href=\"https:\/\/docs.scala-lang.org\/overviews\/core\/implicit-classes.html\">Scala<\/a> \u0438 \u0440\u0430\u0441\u0448\u0438\u0440\u0438\u043c <em>DataStream<\/em> \u043c\u0435\u0442\u043e\u0434\u043e\u043c <em>toStreamTable<\/em>:<\/p>\n<pre><code class=\"scala\">implicit class DataStreamOps[A: RowTransformer](val stream: DataStream[A]) {    def toStreamTable(primaryKey: String*)(implicit env: StreamTableEnvironment): Table = {     val transformer = RowTransformer[A]     stream       .map(transformer.toRow(_))       .flatMap(_.toSeq)(transformer.typeInformation)       .toChangelogTable(         env,         Schema           .newBuilder()           .primaryKey(primaryKey: _*)           .build(),         ChangelogMode.upsert()       )   } } <\/code><\/pre>\n<p>\u041d\u0430 \u0432\u0445\u043e\u0434 \u0444\u0443\u043d\u043a\u0446\u0438\u0438 \u043f\u043e\u0434\u0430\u0435\u0442\u0441\u044f \u0441\u043f\u0438\u0441\u043e\u043a \u0438\u043c\u0435\u043d \u043f\u043e\u043b\u0435\u0439, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0441\u043e\u0441\u0442\u0430\u0432\u044f\u0442 \u043f\u0435\u0440\u0432\u0438\u0447\u043d\u044b\u0439 \u043a\u043b\u044e\u0447 \u043d\u0430\u0448\u0435\u0439 \u0442\u0430\u0431\u043b\u0438\u0446\u044b. \u041a\u0430\u043a \u0438 \u0432 \u0431\u043e\u043b\u044c\u0448\u0438\u043d\u0441\u0442\u0432\u0435 \u0444\u0440\u0435\u0439\u043c\u0432\u043e\u0440\u043a\u043e\u0432 \u043d\u0430\u0438\u043c\u0435\u043d\u044c\u0448\u0435\u0439 \u0435\u0434\u0438\u043d\u0438\u0446\u0435\u0439 \u0434\u043b\u044f \u0440\u0430\u0431\u043e\u0442\u044b \u0441 \u0442\u0430\u0431\u043b\u0438\u0446\u0430\u043c\u0438 \u044f\u0432\u043b\u044f\u0435\u0442\u0441\u044f \u0441\u0442\u0440\u043e\u043a\u0430 (<a href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-stable\/api\/java\/org\/apache\/flink\/types\/Row.html\">Row<\/a>). \u041d\u0443\u0436\u0435\u043d \u0441\u043f\u043e\u0441\u043e\u0431 \u043f\u0440\u0435\u043e\u0431\u0440\u0430\u0437\u043e\u0432\u0430\u043d\u0438\u044f \u0432\u0441\u0435\u0445 \u043d\u0430\u0448\u0438\u0445 \u0441\u0443\u0449\u043d\u043e\u0441\u0442\u0435\u0439 \u0432 \u0442\u0438\u043f <em>Row<\/em>, \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0438\u043c \u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u0432\u043e\u0437\u043c\u043e\u0436\u043d\u043e\u0441\u0442\u044f\u043c\u0438 <em>Scala<\/em>, \u043d\u0430 \u044d\u0442\u043e\u0442 \u0440\u0430\u0437 \u0442\u0435\u043c \u0447\u0442\u043e <em>Scala<\/em> \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u0435\u0442 \u043f\u0438\u0441\u0430\u0442\u044c \u043a\u043e\u0434 \u0432 <a href=\"https:\/\/ru.wikipedia.org\/wiki\/%D0%A4%D1%83%D0%BD%D0%BA%D1%86%D0%B8%D0%BE%D0%BD%D0%B0%D0%BB%D1%8C%D0%BD%D0%BE%D0%B5_%D0%BF%D1%80%D0%BE%D0%B3%D1%80%D0%B0%D0%BC%D0%BC%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D0%B5\">\u0444\u0443\u043d\u043a\u0446\u0438\u043e\u043d\u0430\u043b\u044c\u043d\u043e\u043c \u0441\u0442\u0438\u043b\u0435<\/a>, \u0430 \u043a\u043e\u043d\u043a\u0440\u0435\u0442\u043d\u043e \u043c\u044b \u0432\u043e\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u043c\u0441\u044f <a href=\"https:\/\/habr.com\/ru\/companies\/tinkoff\/articles\/147759\/\">Type Class<\/a>-\u043c\u0438. \u041d\u0430\u0448 <em>Type Class<\/em> \u0431\u0443\u0434\u0435\u0442 \u0438\u043c\u0435\u0442\u044c \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u0439 \u0432\u0438\u0434:<\/p>\n<pre><code class=\"scala\">sealed trait RowTransformer[A] {   def toRow(entity: A): Option[Row]   protected val fieldsName: Array[String]   implicit def typeInformation: TypeInformation[Row] } <\/code><\/pre>\n<p>\u041f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u0438\u043c \u0440\u0435\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044e \u0434\u043b\u044f \u043e\u0434\u043d\u043e\u0439 \u0438\u0437 \u0442\u0430\u0431\u043b\u0438\u0446 (\u0434\u043b\u044f \u043e\u0441\u0442\u0430\u043b\u044c\u043d\u044b\u0445 \u0430\u043b\u0433\u043e\u0440\u0438\u0442\u043c \u0431\u0443\u0434\u0435\u0442 \u0430\u043d\u0430\u043b\u043e\u0433\u0438\u0447\u0435\u043d):<\/p>\n<pre><code class=\"scala\">implicit case object ClientRow extends RowTransformer[Client] {   override def toRow(entity: Client): Option[Row] = {     val kind   = kindOf(entity.operation)     val record = entity.before.orElse(entity.after)     record.map(r => Row.ofKind(kind, Int.box(r.clientId), r.name, r.surname, r.patronymic.orNull, r.sex))   }    override implicit def typeInformation: TypeInformation[Row] = {     Types.ROW_NAMED(fieldsName, Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING)   }    override protected val fieldsName: Array[String] = Array(\"clientId\", \"name\", \"surname\", \"patronymic\", \"sex\") } <\/code><\/pre>\n<p>\u0422\u0435\u043f\u0435\u0440\u044c \u043c\u044b \u043c\u043e\u0436\u0435\u043c \u043f\u0440\u0435\u043e\u0431\u0440\u0430\u0437\u043e\u0432\u044b\u0432\u0430\u0442\u044c <code>DataStream<\/code> \u0432 <code>Table<\/code> \u043e\u0434\u043d\u0438\u043c \u0434\u0435\u0439\u0441\u0442\u0432\u0438\u0435\u043c:<\/p>\n<pre><code class=\"scala\">val clients: Table = KafkaConsumerSource.configureKafkaDataSource[Client](\"clients\")   .toStreamTable(\"clientId\") <\/code><\/pre>\n<p>\u042d\u0442\u043e \u0434\u0430\u043b\u0435\u043a\u043e \u043d\u0435 \u0435\u0434\u0438\u043d\u0441\u0442\u0432\u0435\u043d\u043d\u044b\u0439 \u0441\u043f\u043e\u0441\u043e\u0431 \u0441\u043e\u0437\u0434\u0430\u043d\u0438\u044f \u0442\u0430\u0431\u043b\u0438\u0446, \u0435\u0441\u043b\u0438 \u043c\u044b \u0438\u0437\u043d\u0430\u0447\u0430\u043b\u044c\u043d\u043e \u0445\u043e\u0442\u0438\u043c \u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c \u0441 \u0434\u0430\u043d\u043d\u044b\u043c\u0438 \u043a\u0430\u043a \u0441 \u0442\u0430\u0431\u043b\u0438\u0446\u0435\u0439, \u043d\u0435\u043e\u0431\u044f\u0437\u0430\u0442\u0435\u043b\u044c\u043d\u043e \u0441\u043f\u0435\u0440\u0432\u0430 \u043f\u0440\u0435\u0434\u0441\u0442\u0430\u0432\u043b\u044f\u0442\u044c \u0434\u0430\u043d\u043d\u044b\u0435 \u0432 \u0432\u0438\u0434\u0435 <em>DataStream<\/em>:<\/p>\n<pre><code class=\"scala\">tableEnv.createTable(   \"companies\",   TableDescriptor     .forConnector(\"kafka\")     .schema(       Schema         .newBuilder()         .column(\"clientId\", DataTypes.INT().notNull())         .column(\"companyId\", DataTypes.INT().notNull())         .column(\"companyName\", DataTypes.STRING().notNull())         .primaryKey(\"clientId\")         .build()     )     .option(KafkaConnectorOptions.TOPIC, Seq(\"companies\").asJava)     .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, Kafka.BootstrapServers)     .option(KafkaConnectorOptions.PROPS_GROUP_ID, \"companies-group\")     .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET)     .option(KafkaConnectorOptions.VALUE_FORMAT, \"debezium-json\")     .build() )  val companies: Table = tableEnv.sqlQuery(\"SELECT * FROM companies\")  companies.print(\"Companies\") <\/code><\/pre>\n<p>\u0418 \u043a\u043e\u043d\u0435\u0447\u043d\u043e \u0436\u0435 \u043a\u0443\u0434\u0430 \u0431\u0435\u0437 <em>SQL<\/em>:<\/p>\n<pre><code class=\"scala\">tableEnv.executeSql(   \"\"\"     |CREATE TABLE payments (clientId INT NOT NULL, amount INT NOT NULL, tmMs BIGINT NOT NULL)     |WITH (     | 'connector' = 'kafka',     | 'topic' = 'payments',     | 'properties.bootstrap.servers' = 'localhost:9092',     | 'properties.group.id' = 'payments-group',     | 'scan.startup.mode' = 'earliest-offset',     | 'format' = 'debezium-json'     |)     |\"\"\".stripMargin)  val payments: Table = tableEnv.sqlQuery(\"SELECT * FROM payments\")  payments.print(\"Payments\") <\/code><\/pre>\n<p>\u041c\u044b \u043e\u043a\u043e\u043d\u0447\u0430\u0442\u0435\u043b\u044c\u043d\u043e \u043f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u0438\u043b\u0438\u0441\u044c \u043a \u043d\u0430\u043f\u0438\u0441\u0430\u043d\u0438\u044e \u043e\u0441\u043d\u043e\u0432\u043d\u043e\u0439 \u043b\u043e\u0433\u0438\u043a\u0438. \u0414\u043b\u044f \u043d\u0430\u0448\u0435\u0439 \u0437\u0430\u0434\u0430\u0447\u0438 \u0431\u0443\u0434\u0435\u043c \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c <em>inner join<\/em>, \u0442.\u0435. \u043f\u0440\u0438 \u043f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0438 \u043d\u043e\u0432\u043e\u0433\u043e \u0441\u043e\u0431\u044b\u0442\u0438\u044f \u043c\u044b \u0431\u0443\u0434\u0435\u043c \u0432\u044b\u0434\u0430\u0432\u0430\u0442\u044c \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442 \u0432\u043e <em>\u0432\u043d\u0435\u0448\u043d\u044e\u044e \u0441\u0438\u0441\u0442\u0435\u043c\u0443<\/em> \u0442\u043e\u043b\u044c\u043a\u043e \u0432 \u0441\u043b\u0443\u0447\u0430\u0435 \u043d\u0430\u043b\u0438\u0447\u0438\u044f \u0434\u0430\u043d\u043d\u044b\u0445 \u043f\u043e \u043a\u043b\u044e\u0447\u0443 \u0432\u043e \u0432\u0441\u0435\u0445 \u0442\u0430\u0431\u043b\u0438\u0446\u0430\u0445.<\/p>\n<pre><code class=\"scala\">clients   .join(companies, $\"client.clientId\" === $\"company.clientId\")   .join(payments, $\"client.clientId\" === $\"payment.clientId\")   .select(\"client.clientId\", \"name\", \"surname\", \"companyId\", \"companyName\", \"amount\", \"timestamp\")   .toChangelogStream(Schema.derived(), ChangelogMode.upsert())   .print(\"Portfolio\") <\/code><\/pre>\n<figure class=\"\">\n<div><figcaption>\u0420\u0438\u0441\u0443\u043d\u043e\u043a 2 &#8212; Dynamic Tables<\/figcaption><\/div>\n<\/figure>\n<h2>\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0435 State Processor API<\/h2>\n<p>\u0421\u043d\u043e\u0432\u0430 \u043d\u0430\u0447\u043d\u0435\u043c \u0441 \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u044f, \u0432 \u044d\u0442\u043e\u043c \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0435 \u043d\u0430\u043c \u0431\u0443\u0434\u0435\u0442 \u0434\u043e\u0441\u0442\u0430\u0442\u043e\u0447\u043d\u043e \u0442\u043e\u043b\u044c\u043a\u043e \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u044f \u0434\u043b\u044f \u0440\u0430\u0431\u043e\u0442\u044b \u0432 \u043f\u043e\u0442\u043e\u043a\u043e\u0432\u043e\u043c \u0440\u0435\u0436\u0438\u043c\u0435:<\/p>\n<pre><code class=\"scala\">implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment <\/code><\/pre>\n<p>\u0412 \u043f\u0440\u043e\u0448\u043b\u043e\u043c \u0432\u0430\u0440\u0438\u0430\u043d\u0442\u0435 \u043c\u044b \u043f\u0440\u0438\u0432\u043e\u0434\u0438\u043b\u0438 \u0432\u0441\u0435 \u043d\u0430\u0448\u0438 \u0441\u0443\u0449\u043d\u043e\u0441\u0442\u0438 \u043a \u0442\u0438\u043f\u0443 <em>Table<\/em>, \u0441\u0435\u0439\u0447\u0430\u0441 \u043d\u0430\u043c \u043d\u0443\u0436\u043d\u043e \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u043d\u0435\u0447\u0442\u043e \u043f\u043e\u0445\u043e\u0436\u0435\u0435:<\/p>\n<pre><code class=\"scala\">case class PortfolioState(    id: Int,    client: Option[Client.Value],    company: Option[ClientCompany.Value],    payment: Option[Payment.Value]) {    def complete: Option[Portfolio] = {     for {       client  &lt;- client       company &lt;- company       payment &lt;- payment     } yield {       Portfolio(         client.clientId,         client.name,         client.surname,         company.companyId,         company.companyName,         payment.amount,         payment.timestamp       )     }   } } <\/code><\/pre>\n<p>\u041c\u044b \u043e\u043f\u0438\u0441\u0430\u043b\u0438<\/p>\n<\/div>\n<\/div>\n<\/div>\n<\/div>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[],"class_list":["post-349722","post","type-post","status-publish","format-standard","hentry"],"_links":{"self":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/349722","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=349722"}],"version-history":[{"count":0,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/349722\/revisions"}],"wp:attachment":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=349722"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=349722"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=349722"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}