В предыдущих сериях (FAQ • 1 • 2 • 3 • 4 • 5 • 6 ) мы весьма подробно рассмотрели, как написать на Java собственный интерпретатор объектно-ориентированного диалекта SQL поверх Spark RDD API, заточенный на задачи подготовки и трансформации наборов данных.
В данной части поговорим о том, как добавить в собственный диалект SQL поддержку процедур. Например,
-- library.tdl CREATE PROCEDURE dwellTimeByMode(@signals, @target, @outPrefix, @modes = ['pedestrian', 'non_pedestrian', 'car', 'bike'], @groupid='cell10') AS BEGIN LOOP $mode IN $modes BEGIN SELECT * FROM $signals INTO "{$signals}/{$mode}" WHERE mode=$mode; CALL dwellTime(@signals_userid_attr=userid, @target_userid_attr=userid, @target_grouping_attr=$groupid ) INPUT signals FROM "{$signals}/{$mode}", target FROM $target OUTPUT INTO "{$outPrefix}/{$mode}"; ANALYZE "{$signals}/{$mode}"; ANALYZE "{$outPrefix}/{$mode}"; END; END; --- ... --- ... --- ... --- -- script.tdl CALL dwellTimeByMode(@signals=$this_month, @target=$population, @outPrefix=$this_month);
Нафига это надо?
Ну, допустим, у нас уже есть некоторое количество SQL ETL кода, наработанного за время эксплуатации инструмента в продакшене, и становится заметно, что значительная часть скриптов на разных проектах совпадает, и из раза в раз повторяется. Логично было бы вынести все эти совпадающие куски в библиотеку, чтобы держать в одном месте, да и вызывать с какими надо параметрами, когда надо. Вот прям как на примере выше.
Ну, если интерпретатор уже написан, то реализовать в нём процедры — это как нефиг делать. Очень простая задача.
Ведь что есть процедура с точки зрения интерпретатора?
Процедура с точки зрения интерпретатора — это кусок AST, который:
- выдирается при первичном просмотре исходного кода,
- не интерпретируется, откладывается куда-то под заданным именем,
- многократно интерпретируется в моменты вызовов,
- имеет свой дочерний контекст переменных,
- переменные берутся из родительского контекста, и аугментируются параметрами процедуры.
И чем обработка процедуры отличается от контекста, например, цикла в таком случае? Вторым пунктом из списочка выше. AST цикла никуда не откладывается, и имени не имеет. А так, в нём ровно всё то же самое.
Это означает, что нам придётся завести какое-то место, куда можно откладывать кусок AST. А также, каким-то образом запоминать имена параметров, чтобы порождать переменные, соответствующие переданным в процедуру параметрам, в дочернем контексте при её вызове. Всего лишь, и дело в шляпе.
Но сначала стоит описать формальный синтаксис процедур для парсера SQL.
create_proc : ( K_CREATE ( S_OR K_REPLACE )? )? K_PROCEDURE func ( S_OPEN_PAR proc_param ( S_COMMA proc_param )* S_CLOSE_PAR )? K_AS? K_BEGIN statements K_END K_PROCEDURE? ; proc_param : param | S_AT L_IDENTIFIER ; drop_proc : K_DROP K_PROCEDURE func ( S_COMMA func )* ;
А также расширить оператор CALL
возможностью вызвать процедуру (которая у нас, в отличие от написанных на Java операций, исполняется в том же контексте, что и сам скрипт, и потому не требует отдельного указания INPUT
и OUTPUT
датасетов, а потребляет и создаёт их там напрямую).
call_stmt : K_CALL func_expr operation_io | K_CALL func_expr ;
Таким образом, синтаксис — как и подразумевающаяся семантика — заданы. Согласно им, процедуру можно создать, заменить, и даже дропнуть. А параметры у неё могут как иметь значения по умолчанию (то есть, быть опциональными), так и не иметь оных (то есть, быть обязательными). Прекрасно, осталось всё это дело поддержать в интерпретаторе.
Ну и небольшой нюанс — раз у нас процедуры SQL вызываются той же синтаксической конструкцией, что и операции Java, причём по имени, то и живут они автоматически в том же пространстве имён. Значит, не забыть, что имя при создании надо проверять на занятость.
Итак. Давайте опишем метаданные самой процедуры.
public class Procedure { @JsonIgnore public final TDL4.StatementsContext ctx; public final Map<String, Param> params; private Procedure(TDL4.StatementsContext ctx, Map<String, Param> params) { this.ctx = ctx; this.params = params; } @JsonCreator public Procedure(Map<String, Param> params) { this.params = params; this.ctx = null; } public static Builder builder(TDL4.StatementsContext ctx) { return new Builder(ctx); } public static class Builder { private final TDL4.StatementsContext ctx; private final Map<String, Param> params = new HashMap<>(); private Builder(TDL4.StatementsContext ctx) { this.ctx = ctx; } public Builder mandatory(String name) { params.put(name, new Param()); return this; } public Builder optional(String name, Object value) { params.put(name, new Param(value)); return this; } public Procedure build() { return new Procedure(ctx, params); } } public static class Param { public final boolean optional; public final Object defaults; @JsonCreator public Param(boolean optional, Object defaults) { this.optional = optional; this.defaults = defaults; } private Param(Object defaults) { this.optional = true; this.defaults = defaults; } private Param() { this.optional = false; this.defaults = null; } } }
Кажется, что кода много, но это Java. Даже если 17 версии, всё равно по-прежнему многословный язык. За то и люблю. Но тут ничего, кроме куска AST и мапы с параметрами. А теперь давайте положим их куда-нибудь, типа заведём библиотеку:
public class Library { public final Map<String, Procedure> procedures = new HashMap<>(); }
… и подключим её в контекст интерпретатора:
public class TDL4Interpreter { //пропущено всё сейчас неважное private final String script; private DataContext dataContext; private final OptionsContext options; private VariablesContext variables; private final TDL4ErrorListener errorListener; private TDL4.ScriptContext scriptContext; private final Library library; public TDL4Interpreter(Library library, String script, VariablesContext variables, OptionsContext options, TDL4ErrorListener errorListener) { this.library = library; this.script = script; this.variables = variables; this.options = options; this.errorListener = errorListener; } //пропущено всё сейчас неважное
Таким образом, создав один раз экземпляр Library
, мы можем переиспользовать его столько раз, сколько экземпляров интерпретатора у нас вызывается (как и другие контексты, типа глобальных переменных, опций, и данных — то есть, сам Spark). Что крайне полезно при подключении каталога библиотечных скриптов, когда надо пропарсить кучу файлов, и собрать из них все процедуры.
Кстати, для этой цели стоит пропатчить обработчик командных ключей, чтобы он мог брать не один файл со скриптом, а сразу много. Что-то типа --script "s3://projects/commons/v2.0/lib/*.tdl,s3://projects/this_project/step25.tdl"
. К счастью, поддержка glob patterns в путях у нас уже есть, закопанная где-то в обёртке источника данных для классов Hadoop FileSystem, её и переиспользуем.
Выглядит это дело страшновато, конечно. Ещё один парсер в проекте, на этот раз потоковый посимвольный, да… Увы, но готовой библиотечки для разбора glob patterns на Java мне найти не удалось — по крайней мере, такой, чтобы она либо не умела слишком мало, либо не тащила с собой левые зависимости, так что пришлось писать самому (чего я терпеть не могу):
public class HadoopStorage { public static final String PATH_PATTERN = "^([^:]+:/*[^/]+)/(.+)"; public static List<Tuple2<String, String>> pathToGroups(String inputPath) throws InvalidConfigurationException { List<Tuple2<String, String>> ret = new ArrayList<>(); int curlyLevel = 0; List<String> splits = new ArrayList<>(); StringBuilder current = new StringBuilder(); for (int i = 0; i < inputPath.length(); i++) { char c = inputPath.charAt(i); switch (c) { case '\\': { current.append(c).append(inputPath.charAt(++i)); break; } case '{': { curlyLevel++; current.append(c); break; } case '}': { curlyLevel--; current.append(c); break; } case ',': { if (curlyLevel == 0) { splits.add(current.toString()); current = new StringBuilder(); } else { current.append(c); } break; } default: { current.append(c); } } } splits.add(current.toString()); for (String split : splits) { Matcher m = Pattern.compile(PATH_PATTERN).matcher(split); if (m.matches()) { String rootPath = m.group(1); String path = m.group(2); List<String> transSubs = new ArrayList<>(); int groupingSub = -1; String sub = path; int s = 0; nextSub: while (true) { StringBuilder translatedSub = new StringBuilder(); curlyLevel = 0; boolean inSet = false; for (int i = 0; i < sub.length(); i++) { char c = sub.charAt(i); switch (c) { case '/': { if (!inSet && (curlyLevel == 0)) { transSubs.add(translatedSub.toString()); if (++i != sub.length()) { s++; sub = sub.substring(i); continue nextSub; } else { break nextSub; } } else { translatedSub.append(c); } break; } case '\\': { translatedSub.append(c); if (++i != sub.length()) { translatedSub.append(sub.charAt(i)); } break; } case '$': case '(': case ')': case '|': case '+': { translatedSub.append('\\').append(c); break; } case '{': { curlyLevel++; translatedSub.append("(?:"); if (groupingSub < 0) { groupingSub = s - 1; } break; } case '}': { if (curlyLevel > 0) { curlyLevel--; translatedSub.append(")"); } else { translatedSub.append(c); } break; } case ',': { translatedSub.append((curlyLevel > 0) ? '|' : c); break; } case '?': { translatedSub.append('.'); if (groupingSub < 0) { groupingSub = s - 1; } break; } case '*': { if ((i != (sub.length() - 1)) && (sub.charAt(i + 1) == '*')) { translatedSub.append(".*"); i++; } else { translatedSub.append("[^/]*"); } if (groupingSub < 0) { groupingSub = s - 1; } break; } case '[': { inSet = true; translatedSub.append(c); if (groupingSub < 0) { groupingSub = s - 1; } break; } case '^': { if (inSet) { translatedSub.append('\\'); } translatedSub.append(c); break; } case '!': { translatedSub.append(inSet && ('[' == sub.charAt(i - 1)) ? '^' : '!'); break; } case ']': { inSet = false; translatedSub.append(c); break; } default: { translatedSub.append(c); } } } if (inSet || (curlyLevel > 0)) { throw new InvalidConfigurationException("Glob pattern '" + split + "' contains unbalances range [] or braces {} definition"); } if (groupingSub < 0) { groupingSub = s; } transSubs.add(translatedSub.toString()); break; } if (s < 1) { groupingSub = 0; } String groupSub = transSubs.get(groupingSub); transSubs.remove(groupingSub); transSubs.add(groupingSub, "(" + groupSub + ")"); rootPath += "/" + StringUtils.join(transSubs.subList(0, groupingSub), '/'); ret.add(new Tuple2<>( rootPath + "/" + groupSub, ".*/" + StringUtils.join(transSubs.subList(groupingSub, transSubs.size()), '/') + ".*" )); } else { throw new InvalidConfigurationException("Glob pattern '" + split + "' must have protocol specification and its first path part must be not a grouping candidate"); } } return ret; } }
Нууууу, с подготовкой вроде всё.
Теперь сам кусок интерпретатора, который бы создавал, удалял, и — главное — исполнял процедуры, получается донельзя тривиальным.
private void createProcedure(TDL4.Create_procContext ctx) { String procName = resolveName(ctx.func().L_IDENTIFIER()); if (Operations.OPERATIONS.containsKey(procName)) { throw new InvalidConfigurationException("Attempt to CREATE PROCEDURE which overrides OPERATION \"" + procName + "\""); } if ((ctx.K_REPLACE() == null) && library.procedures.containsKey(procName)) { throw new InvalidConfigurationException("PROCEDURE " + procName + " has already been defined. Offending definition at line " + ctx.K_CREATE().getSymbol().getLine()); } Procedure.Builder proc = Procedure.builder(ctx.statements()); for (TDL4.Proc_paramContext procParam : ctx.proc_param()) { if (procParam.param() == null) { proc.mandatory(resolveName(procParam.L_IDENTIFIER())); } else { proc.optional(resolveName(procParam.param().L_IDENTIFIER()), Expressions.evalLoose(expression(procParam.param().attr_expr().children, ExpressionRules.LET), variables)); } } library.procedures.put(procName, proc.build()); } private void dropProcedure(TDL4.Drop_procContext ctx) { for (TDL4.FuncContext func : ctx.func()) { String procName = resolveName(func.L_IDENTIFIER()); library.procedures.remove(procName); } } private void call(TDL4.Call_stmtContext ctx) { TDL4.Func_exprContext funcExpr = ctx.func_expr(); String verb = resolveName(funcExpr.func().L_IDENTIFIER()); Map<String, Object> params = resolveParams(funcExpr.params_expr()); if (ctx.operation_io() != null) { if (!Operations.OPERATIONS.containsKey(verb)) { throw new InvalidConfigurationException("CALL \"" + verb + "\"() refers to unknown Operation"); } callOperation(verb, params, ctx.operation_io()); } else { if (!library.procedures.containsKey(verb)) { throw new InvalidConfigurationException("CALL \"" + verb + "\"() refers to undefined PROCEDURE"); } callProcedure(verb, params); } } private void callProcedure(String procName, Map<String, Object> params) { if (verbose) { System.out.println("CALLing PROCEDURE " + procName + " with params " + params + "\n"); } Procedure proc = library.procedures.get(procName); for (Map.Entry<String, Procedure.Param> defEntry : proc.params.entrySet()) { String name = defEntry.getKey(); Procedure.Param paramDef = defEntry.getValue(); if (!paramDef.optional && !params.containsKey(name)) { throw new InvalidConfigurationException("PROCEDURE " + procName + " CALL must have mandatory parameter @" + name + " set"); } } variables = new VariablesContext(variables); variables.putAll(params); for (TDL4.StatementContext stmt : proc.ctx.statement()) { statement(stmt); } variables = variables.parent; }
Как и было сказано в начале, при разборе мы тупо выдираем кусок AST, и складываем его в Library
, добавляя описание параметров. А при вызове превращаем переданные параметры в переменные дочернего контекста, и исполняем запомненный кусок AST с этим контекстом.
Что ещё?
Ну, у нас есть REPL, например, в котором есть команды \SHOW
и \DESCRIBE
. Для них нужно добавить соответствующие подкоманды \SHOW PROCEDURE
и \DESCRIBE PROCEDURE
. но это настолько тривиальная задача, что мне лень её описывать. Ну, в команду \SCRIPT
ещё надо добавить поддержку glob patterns. Но это тоже до жути тривиально, и занимает пять минут.
В полном объёме посмотреть на все описанные сегодня изменения можно в соответствующем пулл реквесте.
Таким образом, когда у тебя уже есть какой-то интерпретатор, то расширять его новыми фичами — одно удовольствие. Другое дело, чтобы написать такой интерпретатор, который легко расширяется, нужно сначала планировать язык и контексты где-то около года. Чего всем и советую.
— Тааааак, а погодите-ка! У нас есть контексты переменных, есть процедуры, в интерпретаторе есть даже пекеджы — и что нам теперь мешает запилить классы с объектами?!
— Ничего не мешает, только здравый смысл. В ETL SQL классы не нужны. Я не настолько наркоман, чтобы запилить их даже ради шутки. Но много времени подобное упражнение не займёт…
Промка: https://pastorgl.github.io/datacooker-etl
Исходники: https://github.com/PastorGL/datacooker-etl
Официальная группа в телеге: https://t.me/data_cooker_etl
ссылка на оригинал статьи https://habr.com/ru/articles/838034/
Добавить комментарий