Apache Flink: Flink Table API & SQL, часть 2

от автора

Table API — это API для взаимодействия с данными в табличном виде. Если рассматривать аналогию со Spark, то наша таблица в Table API — это датафреймы в Spark. Нет четкой структуры, каждая точка потока — таблица, то есть после преобразования таблицы нам возвращается таблица, как это происходит и в Spark. 

Так же, как и Spark, Table API использует свой диалект SQL, который можно использовать над таблицами. Таблицу мы можем зарегистрировать в каталоге Table API и обращаться к ней с помощью SQL, используя команду Execute SQL. Все преобразования можно делать как обращаясь к таблице напрямую, через метод, так и при помощи SQL, то есть при помощи Select можно создать новую таблицу. Может запускаться как приложение, так и интерактивно SQL-запросами. То есть если у вас развернут Flink-кластер, то можно к нему подключиться при помощи Flink SQL, вбивать команды, создавать каталоги, подключаться к каталогам и проворачивать, например, батчевые SQL-запросы, которые перетягивать данные.

Главная фишка: источники и приемники могут создаваться и конфигурироваться при помощи DDL SQL.

Перейдем к коду. 

Напишем примитивный “Hello, World!”, который делает минимум преобразований и может запуститься прямо из IDE.

import org.apache. flink. streaming.api. scala. StreamExecutionEnvironment import org.apache. flink. table.api. Expressions. {lit, row} import org.apache. flink. table.api. bridge.scala. StreamTableEnvironment import org.apache.flink.table.api. {DataTypes, FieldExpression, Table, WithOperations} // create environments of both APIs val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create (env) // Источник val inputTable: Table = tableEnv. fromValues ( DataTypes. ROW( DataTypes. FIELD( name = "id" DataTypes. DECIMAL ( precision = 10, scale = 2)), DataTypes. FIELD ( name = "name", DataTypes. STRING()), DataTypes.FIELD ( name = "probe" DataTypes. STRING)) row( head = 1, tail = "Saint Petersburg", "sp"), row( head = 2L , tail = "Saint Petersburg2", "sp"), row( head = 3, tail = "Vikhino", "mn"), row( head = 4L, tail = "Saint Petersburg3", "sp"), row( head = 5, tail = "Mytishi", "mn"), row( head : 6L, tail = "Voronezh" "vr"), row( head = 7, tail = "Leningrad", "sp"), row( head = 8L, tail = "Voronezh2", "vr") , row( head = 9L, tail = "NULL POINTER EXCEPTION", "NU")

Мы должны создать все окружение, подключение к окружению кластера нашего Flink. Создаем сначала StreamExecutionEnvironment, а на его основе уже StreamTableEnvironment со всеми необходимыми каталогами и со всеми подключениями. Далее создаем через tableEnv из значений таблицу, задаем схему, которая будет в таблице, у нас это id, name и probe, и создаем некоторое количество значений. Вот у нас готовый источник, наша таблица. Она будет источником для последующих преобразований.

Далее мы создаем наши приемники, наши Sink’и при помощи SQL.

С помощью команды executeSql мы в наш локальный каталог создаем таблицу. Например, таблицу radcom_cdr_s1ap_mn с двумя значениями, и создаем здесь connector — связующее звено между Flink и каким-то источником.

row( head 1, tail = "Saint Petersburg", "sp") , row( head = 2L, tail = "Saint Petersburg2", "sp"), row( head = 3, tail = "Vikhino" "mn"), row( head : 4L, tail = "Saint Petersburg3", "sp"), row( head = 5, tail = "Mytishi" "mn") , row( head = 6L , tail = "Voronezh" "vr"), row( head = 7 , tail = "Leningrad", "sp"), row( head = 8L, tail = "Voronezh2", "vr"), row( head = 9L, tail = "NULL POINTER EXCEPTION", "'NU"') // Первый sink tableEnv. executeSql( statement = "" "CREATE TABLE radcom_cdr_s1ap_mn ( id DECIMAL, name string WITH 'connector' = 'print' 'print- identifier' = 'mn'

Connector’ы могут быть как к Kafka, так и к каким-нибудь базам данных через JDBC и к Hive.  Коннекторов достаточно много, могут быть для тестов коннекторы, которые генерируют данные, здесь выводится коннектор print. У всех коннекторов есть различные свойства, в данном случае есть print-identifier, которое означает, что вывод этой таблицы будет с припиской “mn”.

Здесь создали по такому же типу второй sink, который будет принимать все для sp региона и такой же для vr региона.

// Второй sink tableEnv.executeSql( statement: CREATE TABLE radcom_cdr_slap_sp ( id DECIMAL, name string WITH ( ' connector' = 'print' 'print- identifier' = 'sp' ) "n n) tableEnv. executeSql ( statement = ""I " CREATE TABLE radcom_cdr_slap_vr id DECIMAL, name string WITH ( ' connector' = 'print', 'print-identifier' = 'vr'

Также сделали sink для тех регионов, которые не обозначены у нас как mn, sp и vr. Таким образом, мы создали четыре таблицы, четыре приемника, которые у нас будут принимать данные от потока.

Теперь у нас есть наша таблица-источник, и здесь мы для каждого probe будем фильтровать поток, укажем, что если колонка probe у потока не равна какому-то probe, например, sp, то она должна отместись. А все, что принадлежит sp, должно попасть сюда. Дропаем колонку probe, она должна исчезнуть, потому что в схеме она не обозначена, и вставляем. Кстати, у коннектора print есть еще метод print.

probes. foreach X => { inputTable .filter ( predicate = $"probe" === Lit(x)) .dropColumns ( fields = $"probe") .executeInsert ( tablePath = "radcom_cdr_s1ap_"+x) .print ()i }  inputTable filter (defaultTopic) .dropColumns ( fields = $"probe") .executeInsert ( tablePath= "radcom_cdr_slap_"+"de") .print()

Здесь мы исполняем вставку в какую-то таблицу и здесь для всех трех probe мы вставляем данные. Далее мы для последнего probe вставляем наши данные, если они не подходят под наши условия. Это у нас является expression, точно так же, как колонки column в датафреймах, здесь работает expression. Мы его можем передать в фильтр, в executeInsert и прочее. Помимо таких команд, мы можем, например, использовать addOrReplaceColumn. Мы можем изменить какую-то колонку, например, name передать upperCase, колонка будет называться по-прежнему. Можем еще ее урезать — используем substring(lit(0), lit(5)), берем первые пять.

Теперь запустим наш поток и посмотрим, что произошло.

Вот наша таблица неименованная, вот выписалось все. Колонка mn обработаться не успела еще, но все разделилось по таблицам. Колонка name поменялась.

Такое вот примитивное приложение на Table API. Мы создали его без заморочек, без лишних приседаний, под одним SQL-запросом один sink. 

Теперь как оно будет работать с Kafka.

import org.apache. flink. streaming.api. scala. StreamExecutionEnvironment import org.apache.flink. table.api. {AnyWith0perations, EnvironmentSettings, FieldExpression, Table, lit} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object TableExample extends App{ val env: StreamExecutionEnvironment = { val streamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment streamExecutionEnvironment.setParallelism(3)  streamExecutionEnvironment val Env: StreamTableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings. newInstance().build()) Env.executeSql( | CREATE TABLE example_table_kafka( id DECIMAL, number BIGINT, lip STRING) |WITH ( ' connector' = 'kafka' 'topic' = 'example_topic', 'value. format' = 'csv'

Здесь уже пример приложения, которое будет запускаться без scratch. Его можно скомпилировать, отправить на Flink кластер, и оно тоже будет запускаться.

Как обычно создаем весь необходимый environment и создаем наш источник, который будет коннектиться к Kafka. Здесь описываем, какие колонки будут у нас в Kafka, и будем в Kafka засылать csv.

Развернул в Docker окружение, засылаю какие-то строчки, будет id, будет некоторое число и ip, который имеет пробелы. 

Засылаем данные. Ставим, что коннектором будет Kafka, указываем, какой будет Topic и для value указываем формат. здесь формат будет csv. Теперь весь csv будет автоматически парситься под этот формат. Далее мы указываем, как будет происходить обработка null для csv, то есть что считать null в наших данных, и игнорировать ли нам ошибки.

tEnv.executeSql( 11 11 11 CREATE TABLE example_table_kafka( lid DECIMAL, | number BIGINT, lip STRING) |WITH ( 'connector' = 'kafka' 'topic' = example_topic', 'value. format' = 'csv', 'value. csv.null-literal' = ' ', 'value. csv. ignore-parse-errors' false' properties.bootstrap. servers' 'Localhost: 9093' , properties.group. id' 'example_group', 'scan. startup. mode' = 'earliest-offset' |"". stripMargin)

Если мы будем игнорировать ошибки, то ошибочная строчка просто будет отбрасываться, но если мы не будем игнорировать ошибки, то при ошибочной записи, которая не подходит под эту структуру, все приложение будет падать.

Также через properties указываем все необходимые свойства для соединения с Kafka. Здесь мы указываем startup.mode, его можно указать как earliest-offset, то есть мы будем читать с самого раннего offset, мы можем здесь поставить group-offset, то есть мы будем работать на основе группы, а так мы здесь группу перезапишем. Вот так мы создаем здесь источник с Kafka.

Вообще все properties мы можем получить уже из документации Kafka: что можно использовать, какие форматы, какие значения можем туда подсунуть, как настроить формат, то есть мы можем помимо value настраивать еще key format и так далее. Также есть возможность создать и свои коннекторы Flink к чему-либо. 

Вернемся к нашему приложению.

tEnv. executeSql( *** | CREATE TABLE example_table_kafka( id DECIMAL, | number BIGINT, lip STRING) WITH ( ' connector' = 'kafka' topic' = 'example_topic', 'value. format' = 'csv' 'value.csv.null-literal' 'value. csv. ignore-parse-errors' = 'false', 'properties. bootstrap. servers' = 'Localhost: 9093 ', properties.group. id' 'example_group' 'scan. startup. mode' = earliest-offset' "*".stripMargin)

Здесь мы создали источник и создали приемник. Приемник у нас настраивается в print_table, можно было бы так же настроить к Kafka, но мы хотим видеть результат прямо сейчас, поэтому пока данные будем отправлять в консоль. Еще особенность работы Flink c SQL состоит в том, что выражения SQL работают точно такие же, как и для вставки. То есть если мы захотим вставить из одной таблицы в другую данные, то мы делаем INSERT INTO example_print_table и получаем SELECT.

Получаем id, number, удаляем пустые поля и пишем это как ip. Получаем это все из example_table_kafka. То есть, по сути, у нас три SQL и простенькое приложение, которое перекладывает куда-либо данные из Kafka. Не нужно никаких POJO, но даже если мы не хотим пользоваться SQL, можно действовать следующим образом.

Источник на Kafka мы создали, берем из него все необходимые нам данные.

tEnv. executeSql( statement = "INSERT INTO example_print_table" SELECT id, number, REGEXP REPLACE (ip, as ip FROM example_table_kafka") val tableExample: Table = LEnv. from ( path = "example_table_kafka". tableExample .addOrReplaceColumns($"ip".replace(lit(" "), lit(""))) . executeInsert ( tablePath = "example_print_table")

Создаем table и уже с ним будем взаимодействовать. Через addOrReplaceColumn обращаемся к колонке, удаляем пробелы и производим вставку в таблицу example_print_table.

Теперь запустим наше приложение и посмотрим, что оно выдаст.

Приняли все из Kafka, все вставилось в нашу консоль, то есть в example_print_table. Создали источник, сделали некоторые трансформации и записали это в другой sink. Точно таким же образом мы могли бы поступить с HDFS. 

Из этого следует, что между DataStream и Table API есть некоторые различия. Для того, чтобы создать источник, необходимо создать сначала POJO-класс для него, потом уже десериализатор, создать источник со всеми properties, выкачать их, установить и прочее. Для того, чтобы создать источник в Table API, нам требуется один SQL.

Как только я показываю всем людям, кто занимается Flink, что такое Table API и этот SQL со словами: “Вот так я создаю источник”, на это все удивляются и говорят, что надо использовать Table API.

Однако Table API в таком случае не сильно подходит в таких случаях, где действительно нужен десериализатор. Ко всему прочему, если мы работаем с Kafka, то можно в схеме прописывать и метаинформацию, такую как timestamp, offset, topic. Мы все эти данные тоже можем получить и на основе их создавать новые колонки. Но если нужна десериализация value в другом виде, то для этого лучше использовать DataStream API, там все будет гибче.

Если требуется абсолютно тупое перекладывание из Kafka куда-либо и если требуется работа со стандартными форматами, такими как Avro, csv и json, то тут грех не использовать Table API, тем более все настройки имеются и в документации по поводу коннекторов все описывается нормально.

Далее покажу как наше итоговое приложение преобразовалось из четырех в одно.

val sourceSchemaString: String = Utils.readFile(args(1)) val sourceSql: String = Utils.getSchemaSql(sourceSchemaString) val sourceConfig: SinkConfig = Utils.getKafkaSqlString(config.getConfig(path="source"), sourcesql) tEnv.executeSql(sourceConfig.sql) val sinkSchemaString: String = Utils. readFile(args(2)) val sinkSql: String = Utils.getSchemaSql(sinkSchemaString) val topicList: Seq [Config] config.getConfigList ( path= "sink. topics") config.getConfig( path= "sink. default") val topicConfigs: Seq[SinkConfig] = topicList.map( conf => Utils.getKafkaSqlString (conf, sinkSql) topicConfigs. foreach(× => tEnv. executeSql(×.sql)) val hdfsConfig: Config = config.getConfig( path= "sink.hdfs") val hdfsSink: SinkConfig getHdfsSqlString(hdfsConfig) tEnv.executeSql(hdfsSink.sql)

Здесь у нас генерируется приемник, мы генерируем на основе Avro-схемы SQL и исполняем его здесь на месте. Потом генерируем sink. На основе схемы, которая должна получиться, на основе properties, которые нам необходимы, мы создаем наши sink’и в Kafka и в HDFS. Мы создаем их SQL и коннектимся к этой таблице, добываем из нее данные и делаем необходимые нам преобразования в табличном методе. Делаем преобразования по regexp, преобразуем все в массивы, ко всему прочему, csv может принимать массивы, так что здесь даже эта операция может быть лишней. Делаем несколько преобразований, здесь будут обогащения с Hive. Потом вставка данных, то есть фильтрация и вставка в необходимые Kafka Topics, а здесь для Hive мы создаем колонки для партиционирования и вставляем их в HDFS. 

Ко всему прочему, при вставке данных в HDFS мы можем здесь навешивать еще partition commit, то есть если здесь должна возникнуть новая партиция, то мы можем обозначить, какие действия должны выполняться при вставке в HDFS этих данных. То есть если данные вставились, то сразу создаем партицию. Не потребуется каких-то дополнительных ухищрений, просто воспользовались partition commit, отправили запрос в Hive, и все прекрасно. 

Hive умеет скатывать все наши файлы под определенный размер, по дефолту это 128 Мб, под размер блока в HDFS. То есть каждый процесс скатывает все свои данные в 128 Мб или в какое-то другое указанное значение, либо же скатывает его в какое-то время, то есть минуту, 30 секунд, полчаса или час. Но даже если этого не хватит, мы можем включить еще внутренний compaction, что он все данные, которые скатали процессы, скатает еще глубже, то есть все равно добьется нужного размера файла. Поэтому у Flink сходу из коробки есть compaction.

Также есть обогащение, поэтому у нас одно приложение сразу парсит из Kafka, сразу парсит всю схему, может обогатить данные, скатать их и положить в HDFS или другие места. Такое приложение нас как раз устраивает, то есть мы при помощи Table API превратили все наши четыре приложения в одно. При этом нам не нужно будет его перекомпилировать при изменении схемы, поэтому Table API является хорошей заменой для Spark Streaming.

Какие были проблемы

Первая проблема Flink — его обновляемость. Мы начинали работу Flink с Table API с версии 1.13, а сейчас уже идет версия 1.17. Да, он активно обновляется, но при этом много вещей уходит в deprecated, то есть вы разрабатываете на 1.13, потом переходите на 1.15, и тут у вас некоторые методы deprecated. Придется перелопачивать приложение, снова вглядываться в Flink, следить за ним и, возможно, обновлять его почаще. В общем, его нужно мониторить и постоянно изучать. 

Во-вторых, у Flink есть большая нехватка информации. Книги о нем, опять же, устаревают очень быстро, много фичей появляется, много чего не хватает при работе с Flink. Обычно, чтобы что-то изучить по Flink, мы берем его документацию и его код. Других мест нет. Есть опция, если вы знаете китайский язык, полазить на китайских форумах, потому что в Flink очень вкидываются китайцы. Если вы ищете коннекторы для Flink, то вы заходите на GitHub, и там куча иероглифов. Поэтому если вы знаете китайский, то у вас будет еще больше преимуществ при работе с Flink. Мы решаем этот вопрос так: собираемся раз в две недели с группой разработчиков и дата инженеров, которые работают с потоковой загрузкой, и общаемся по этой теме. Мы общаемся по теме Flink и его фич, нам рассказывают про мониторинг, про то, как подсоединиться к Hive и так далее. То есть решаем вопрос нехватки информации тем, что краудсорсим внутри себя.

Flink еще очень скуден по примерам. То есть примеров о том, как цепляться, как обогащать данные очень мало, приходится много экспериментировать. Поэтому если вы хотите перейти к Flink, готовьтесь к тому, что вам придется много чего попробовать, много как поизголяться. Знания достаются дорого по времени. Но, повторюсь, Flink нам очень помог, он перенимает на себя очень много вычислительных ресурсов, он в этом прекрасен. Он в этом удобнее, чем вся наша текущая структура, которая работает для потокового принятия данных. На этом у меня все, спасибо за внимание!


ссылка на оригинал статьи https://habr.com/ru/articles/863936/