Искусство ETL. Пишем собственный движок SQL на Spark [часть 7]

от автора

В предыдущих сериях (FAQ 1 2 3 4 5 6 ) мы весьма подробно рассмотрели, как написать на Java собственный интерпретатор объектно-ориентированного диалекта SQL поверх Spark RDD API, заточенный на задачи подготовки и трансформации наборов данных.

В данной части поговорим о том, как добавить в собственный диалект SQL поддержку процедур. Например,

-- library.tdl  CREATE PROCEDURE dwellTimeByMode(@signals, @target, @outPrefix,   @modes = ['pedestrian', 'non_pedestrian', 'car', 'bike'],   @groupid='cell10') AS BEGIN     LOOP $mode IN $modes BEGIN         SELECT * FROM $signals INTO "{$signals}/{$mode}" WHERE mode=$mode;          CALL dwellTime(@signals_userid_attr=userid,             @target_userid_attr=userid,             @target_grouping_attr=$groupid         ) INPUT signals FROM "{$signals}/{$mode}", target FROM $target         OUTPUT INTO "{$outPrefix}/{$mode}";          ANALYZE "{$signals}/{$mode}";         ANALYZE "{$outPrefix}/{$mode}";     END; END;  --- ... --- ... --- ... ---  -- script.tdl  CALL dwellTimeByMode(@signals=$this_month, @target=$population, @outPrefix=$this_month);

Нафига это надо?

Ну, допустим, у нас уже есть некоторое количество SQL ETL кода, наработанного за время эксплуатации инструмента в продакшене, и становится заметно, что значительная часть скриптов на разных проектах совпадает, и из раза в раз повторяется. Логично было бы вынести все эти совпадающие куски в библиотеку, чтобы держать в одном месте, да и вызывать с какими надо параметрами, когда надо. Вот прям как на примере выше.

Ну, если интерпретатор уже написан, то реализовать в нём процедры — это как нефиг делать. Очень простая задача.

Ведь что есть процедура с точки зрения интерпретатора?

Процедура с точки зрения интерпретатора — это кусок AST, который:

  • выдирается при первичном просмотре исходного кода,
  • не интерпретируется, откладывается куда-то под заданным именем,
  • многократно интерпретируется в моменты вызовов,
  • имеет свой дочерний контекст переменных,
  • переменные берутся из родительского контекста, и аугментируются параметрами процедуры.

И чем обработка процедуры отличается от контекста, например, цикла в таком случае? Вторым пунктом из списочка выше. AST цикла никуда не откладывается, и имени не имеет. А так, в нём ровно всё то же самое.

Это означает, что нам придётся завести какое-то место, куда можно откладывать кусок AST. А также, каким-то образом запоминать имена параметров, чтобы порождать переменные, соответствующие переданным в процедуру параметрам, в дочернем контексте при её вызове. Всего лишь, и дело в шляпе.

Но сначала стоит описать формальный синтаксис процедур для парсера SQL.

create_proc  : ( K_CREATE ( S_OR K_REPLACE )? )? K_PROCEDURE func      ( S_OPEN_PAR proc_param ( S_COMMA proc_param )* S_CLOSE_PAR )?      K_AS? K_BEGIN        statements      K_END K_PROCEDURE?  ;  proc_param  : param  | S_AT L_IDENTIFIER  ;  drop_proc  : K_DROP K_PROCEDURE func ( S_COMMA func )*  ;

А также расширить оператор CALL возможностью вызвать процедуру (которая у нас, в отличие от написанных на Java операций, исполняется в том же контексте, что и сам скрипт, и потому не требует отдельного указания INPUT и OUTPUT датасетов, а потребляет и создаёт их там напрямую).

call_stmt  : K_CALL func_expr operation_io  | K_CALL func_expr  ;

Таким образом, синтаксис — как и подразумевающаяся семантика — заданы. Согласно им, процедуру можно создать, заменить, и даже дропнуть. А параметры у неё могут как иметь значения по умолчанию (то есть, быть опциональными), так и не иметь оных (то есть, быть обязательными). Прекрасно, осталось всё это дело поддержать в интерпретаторе.

Ну и небольшой нюанс — раз у нас процедуры SQL вызываются той же синтаксической конструкцией, что и операции Java, причём по имени, то и живут они автоматически в том же пространстве имён. Значит, не забыть, что имя при создании надо проверять на занятость.

Итак. Давайте опишем метаданные самой процедуры.

public class Procedure {     @JsonIgnore     public final TDL4.StatementsContext ctx;      public final Map<String, Param> params;      private Procedure(TDL4.StatementsContext ctx, Map<String, Param> params) {         this.ctx = ctx;         this.params = params;     }      @JsonCreator     public Procedure(Map<String, Param> params) {         this.params = params;         this.ctx = null;     }      public static Builder builder(TDL4.StatementsContext ctx) {         return new Builder(ctx);     }      public static class Builder {         private final TDL4.StatementsContext ctx;         private final Map<String, Param> params = new HashMap<>();          private Builder(TDL4.StatementsContext ctx) {             this.ctx = ctx;         }          public Builder mandatory(String name) {             params.put(name, new Param());             return this;         }          public Builder optional(String name, Object value) {             params.put(name, new Param(value));             return this;         }          public Procedure build() {             return new Procedure(ctx, params);         }     }      public static class Param {         public final boolean optional;         public final Object defaults;          @JsonCreator         public Param(boolean optional, Object defaults) {             this.optional = optional;             this.defaults = defaults;         }          private Param(Object defaults) {             this.optional = true;             this.defaults = defaults;         }          private Param() {             this.optional = false;             this.defaults = null;         }     } }

Кажется, что кода много, но это Java. Даже если 17 версии, всё равно по-прежнему многословный язык. За то и люблю. Но тут ничего, кроме куска AST и мапы с параметрами. А теперь давайте положим их куда-нибудь, типа заведём библиотеку:

public class Library {     public final Map<String, Procedure> procedures = new HashMap<>(); }

… и подключим её в контекст интерпретатора:

public class TDL4Interpreter { //пропущено всё сейчас неважное      private final String script;      private DataContext dataContext;      private final OptionsContext options;      private VariablesContext variables;      private final TDL4ErrorListener errorListener;     private TDL4.ScriptContext scriptContext;      private final Library library;      public TDL4Interpreter(Library library, String script, VariablesContext variables, OptionsContext options, TDL4ErrorListener errorListener) {         this.library = library;         this.script = script;         this.variables = variables;         this.options = options;         this.errorListener = errorListener;     }  //пропущено всё сейчас неважное

Таким образом, создав один раз экземпляр Library, мы можем переиспользовать его столько раз, сколько экземпляров интерпретатора у нас вызывается (как и другие контексты, типа глобальных переменных, опций, и данных — то есть, сам Spark). Что крайне полезно при подключении каталога библиотечных скриптов, когда надо пропарсить кучу файлов, и собрать из них все процедуры.

Кстати, для этой цели стоит пропатчить обработчик командных ключей, чтобы он мог брать не один файл со скриптом, а сразу много. Что-то типа --script "s3://projects/commons/v2.0/lib/*.tdl,s3://projects/this_project/step25.tdl". К счастью, поддержка glob patterns в путях у нас уже есть, закопанная где-то в обёртке источника данных для классов Hadoop FileSystem, её и переиспользуем.

Выглядит это дело страшновато, конечно. Ещё один парсер в проекте, на этот раз потоковый посимвольный, да… Увы, но готовой библиотечки для разбора glob patterns на Java мне найти не удалось — по крайней мере, такой, чтобы она либо не умела слишком мало, либо не тащила с собой левые зависимости, так что пришлось писать самому (чего я терпеть не могу):

Получается под 200 строк кода

public class HadoopStorage {     public static final String PATH_PATTERN = "^([^:]+:/*[^/]+)/(.+)";      public static List<Tuple2<String, String>> pathToGroups(String inputPath) throws InvalidConfigurationException {         List<Tuple2<String, String>> ret = new ArrayList<>();          int curlyLevel = 0;          List<String> splits = new ArrayList<>();          StringBuilder current = new StringBuilder();         for (int i = 0; i < inputPath.length(); i++) {             char c = inputPath.charAt(i);              switch (c) {                 case '\\': {                     current.append(c).append(inputPath.charAt(++i));                     break;                 }                 case '{': {                     curlyLevel++;                     current.append(c);                     break;                 }                 case '}': {                     curlyLevel--;                     current.append(c);                     break;                 }                 case ',': {                     if (curlyLevel == 0) {                         splits.add(current.toString());                         current = new StringBuilder();                     } else {                         current.append(c);                     }                     break;                 }                 default: {                     current.append(c);                 }             }         }         splits.add(current.toString());          for (String split : splits) {             Matcher m = Pattern.compile(PATH_PATTERN).matcher(split);             if (m.matches()) {                 String rootPath = m.group(1);                 String path = m.group(2);                  List<String> transSubs = new ArrayList<>();                 int groupingSub = -1;                  String sub = path;                 int s = 0;                  nextSub:                 while (true) {                     StringBuilder translatedSub = new StringBuilder();                      curlyLevel = 0;                     boolean inSet = false;                     for (int i = 0; i < sub.length(); i++) {                         char c = sub.charAt(i);                          switch (c) {                             case '/': {                                 if (!inSet && (curlyLevel == 0)) {                                     transSubs.add(translatedSub.toString());                                      if (++i != sub.length()) {                                         s++;                                          sub = sub.substring(i);                                         continue nextSub;                                     } else {                                         break nextSub;                                     }                                 } else {                                     translatedSub.append(c);                                 }                                 break;                             }                             case '\\': {                                 translatedSub.append(c);                                 if (++i != sub.length()) {                                     translatedSub.append(sub.charAt(i));                                 }                                 break;                             }                             case '$':                             case '(':                             case ')':                             case '|':                             case '+': {                                 translatedSub.append('\\').append(c);                                 break;                             }                             case '{': {                                 curlyLevel++;                                 translatedSub.append("(?:");                                 if (groupingSub < 0) {                                     groupingSub = s - 1;                                 }                                 break;                             }                             case '}': {                                 if (curlyLevel > 0) {                                     curlyLevel--;                                     translatedSub.append(")");                                 } else {                                     translatedSub.append(c);                                 }                                 break;                             }                             case ',': {                                 translatedSub.append((curlyLevel > 0) ? '|' : c);                                 break;                             }                             case '?': {                                 translatedSub.append('.');                                 if (groupingSub < 0) {                                     groupingSub = s - 1;                                 }                                 break;                             }                             case '*': {                                 if ((i != (sub.length() - 1)) && (sub.charAt(i + 1) == '*')) {                                     translatedSub.append(".*");                                     i++;                                 } else {                                     translatedSub.append("[^/]*");                                 }                                 if (groupingSub < 0) {                                     groupingSub = s - 1;                                 }                                 break;                             }                             case '[': {                                 inSet = true;                                 translatedSub.append(c);                                 if (groupingSub < 0) {                                     groupingSub = s - 1;                                 }                                 break;                             }                             case '^': {                                 if (inSet) {                                     translatedSub.append('\\');                                 }                                 translatedSub.append(c);                                 break;                             }                             case '!': {                                 translatedSub.append(inSet && ('[' == sub.charAt(i - 1)) ? '^' : '!');                                 break;                             }                             case ']': {                                 inSet = false;                                 translatedSub.append(c);                                 break;                             }                             default: {                                 translatedSub.append(c);                             }                         }                     }                      if (inSet || (curlyLevel > 0)) {                         throw new InvalidConfigurationException("Glob pattern '" + split + "' contains unbalances range [] or braces {} definition");                     }                      if (groupingSub < 0) {                         groupingSub = s;                     }                      transSubs.add(translatedSub.toString());                      break;                 }                  if (s < 1) {                     groupingSub = 0;                 }                  String groupSub = transSubs.get(groupingSub);                  transSubs.remove(groupingSub);                 transSubs.add(groupingSub, "(" + groupSub + ")");                  rootPath += "/" + StringUtils.join(transSubs.subList(0, groupingSub), '/');                 ret.add(new Tuple2<>(                         rootPath + "/" + groupSub,                         ".*/" + StringUtils.join(transSubs.subList(groupingSub, transSubs.size()), '/') + ".*"                 ));             } else {                 throw new InvalidConfigurationException("Glob pattern '" + split + "' must have protocol specification and its first path part must be not a grouping candidate");             }         }          return ret;     } }

Нууууу, с подготовкой вроде всё.

Теперь сам кусок интерпретатора, который бы создавал, удалял, и — главное — исполнял процедуры, получается донельзя тривиальным.

    private void createProcedure(TDL4.Create_procContext ctx) {         String procName = resolveName(ctx.func().L_IDENTIFIER());          if (Operations.OPERATIONS.containsKey(procName)) {             throw new InvalidConfigurationException("Attempt to CREATE PROCEDURE which overrides OPERATION \"" + procName + "\"");         }          if ((ctx.K_REPLACE() == null) && library.procedures.containsKey(procName)) {             throw new InvalidConfigurationException("PROCEDURE " + procName + " has already been defined. Offending definition at line " + ctx.K_CREATE().getSymbol().getLine());         }          Procedure.Builder proc = Procedure.builder(ctx.statements());         for (TDL4.Proc_paramContext procParam : ctx.proc_param()) {             if (procParam.param() == null) {                 proc.mandatory(resolveName(procParam.L_IDENTIFIER()));             } else {                 proc.optional(resolveName(procParam.param().L_IDENTIFIER()), Expressions.evalLoose(expression(procParam.param().attr_expr().children, ExpressionRules.LET), variables));             }         }         library.procedures.put(procName, proc.build());     }      private void dropProcedure(TDL4.Drop_procContext ctx) {         for (TDL4.FuncContext func : ctx.func()) {             String procName = resolveName(func.L_IDENTIFIER());              library.procedures.remove(procName);         }     }      private void call(TDL4.Call_stmtContext ctx) {         TDL4.Func_exprContext funcExpr = ctx.func_expr();          String verb = resolveName(funcExpr.func().L_IDENTIFIER());         Map<String, Object> params = resolveParams(funcExpr.params_expr());         if (ctx.operation_io() != null) {             if (!Operations.OPERATIONS.containsKey(verb)) {                 throw new InvalidConfigurationException("CALL \"" + verb + "\"() refers to unknown Operation");             }              callOperation(verb, params, ctx.operation_io());         } else {             if (!library.procedures.containsKey(verb)) {                 throw new InvalidConfigurationException("CALL \"" + verb + "\"() refers to undefined PROCEDURE");             }              callProcedure(verb, params);         }     }      private void callProcedure(String procName, Map<String, Object> params) {         if (verbose) {             System.out.println("CALLing PROCEDURE " + procName + " with params " + params + "\n");         }          Procedure proc = library.procedures.get(procName);          for (Map.Entry<String, Procedure.Param> defEntry : proc.params.entrySet()) {             String name = defEntry.getKey();             Procedure.Param paramDef = defEntry.getValue();              if (!paramDef.optional && !params.containsKey(name)) {                 throw new InvalidConfigurationException("PROCEDURE " + procName + " CALL must have mandatory parameter @" + name + " set");             }         }          variables = new VariablesContext(variables);         variables.putAll(params);          for (TDL4.StatementContext stmt : proc.ctx.statement()) {             statement(stmt);         }          variables = variables.parent;     }

Как и было сказано в начале, при разборе мы тупо выдираем кусок AST, и складываем его в Library, добавляя описание параметров. А при вызове превращаем переданные параметры в переменные дочернего контекста, и исполняем запомненный кусок AST с этим контекстом.

Что ещё?

Ну, у нас есть REPL, например, в котором есть команды \SHOW и \DESCRIBE. Для них нужно добавить соответствующие подкоманды \SHOW PROCEDURE и \DESCRIBE PROCEDURE. но это настолько тривиальная задача, что мне лень её описывать. Ну, в команду \SCRIPT ещё надо добавить поддержку glob patterns. Но это тоже до жути тривиально, и занимает пять минут.

В полном объёме посмотреть на все описанные сегодня изменения можно в соответствующем пулл реквесте.

Таким образом, когда у тебя уже есть какой-то интерпретатор, то расширять его новыми фичами — одно удовольствие. Другое дело, чтобы написать такой интерпретатор, который легко расширяется, нужно сначала планировать язык и контексты где-то около года. Чего всем и советую.

— Тааааак, а погодите-ка! У нас есть контексты переменных, есть процедуры, в интерпретаторе есть даже пекеджы — и что нам теперь мешает запилить классы с объектами?!
— Ничего не мешает, только здравый смысл. В ETL SQL классы не нужны. Я не настолько наркоман, чтобы запилить их даже ради шутки. Но много времени подобное упражнение не займёт…

Промка: https://pastorgl.github.io/datacooker-etl
Исходники: https://github.com/PastorGL/datacooker-etl
Официальная группа в телеге: https://t.me/data_cooker_etl


ссылка на оригинал статьи https://habr.com/ru/articles/838034/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *