Пишем движок SQL на Spark. Часть 8: CREATE FUNCTION

от автора

В предыдущих сериях ( 1 2 3 4 5 6 7 Ы ) рассмотрели, как написать на Java собственный интерпретатор объектно-ориентированного диалекта SQL, заточенный на задачи подготовки и трансформации наборов данных, и работающий как тонкая прослойка поверх Spark RDD API.

Штука получилась довольно продвинутая, с поддержкой императивщины типа циклов/ветвлений/переменных, и даже с поддержкой пользовательских процедур. И в плане этой самой императивщины расширяемая: может импортировать функции из Java classpath, равно как и операторы выражений. То есть, если необходимо, можно написать функцию на Java, или определить новый оператор, и использовать потом в любом выражении на SQL.

Круто? Ещё как круто. Но как-то однобоко. Если в языке у нас поддерживаются функции, то почему бы не дать нашим пользователям определять их самостоятельно? Вот прямо через CREATE FUNCTION? Тем более, что вся необходимая для этого инфраструктура уже вовсю присутствует. Да и процедуры на уровне интерпретатора у нас уже поддерживаются ведь…

Функция для затравки.

Но на самом деле не так всё просто.

Если обратиться к предыдущей части, то выяснится, что процедура на SQL — это кусок AST, грубо выдернутый из исходника и каждый раз интерпретируемый заново с новым контекстом переменных. А импортируемая из Java classpath функция дёргается путём прямого вызова «синглтона» соответствующего «функционального» класса с передачей ему параметров.

Для нужд пользовательских функций оба эти подхода не подходят. Интерпретация AST штука крайне медленная, и если функцию надо вызвать для каждой из многих миллионов записей в датасете, например, в списке выражений SELECT, то она будет тормозить как не в себя.

На примере:

CREATE FUNCTION polyDescr() RECORD AS    RETURN POLY_VERTICES() || ' vertices, area: ' || POLY_AREA || 'sq.m, perimeter: '        || POLY_PERIMETER() || 'm, ' || POLY_HOLES() || ' holes';  SELECT name, category, polyDescr() AS descr FROM polygons INTO described_polygons;

Если полигонов в датасете пара сотен тыщ, то пробегать по AST функции polyDescr() эти самые пару сотен тыщ раз будет несколько, кгхмм… накладно.

Для процедур-то такое терпимо, потому что они вызываются на уроне скрипта глобально, и не так часто (и вообще, цикломатика у нас в интерпретаторе контролируется, и по умолчанию на злоупотребления будет ругаться и требовать явного указания глубины вложенности циклов и максимального количества повторов).

Значит, репарсинг AST не прокатывает. Какие есть ещё варианты? Скомпилировать код функции на SQL в класс Java, и вызывать его напрямую, как это делается с функциями из classpath?

Ну, для такого придётся использовать кодогенерацию в байткод Java.

Только я вам сразу скажу — кодогенерация это ОТСТОЙ.

Генерировать байткод для JVM напрямую — противно, неудобно, и чревато целой уймой всяких fringe cases, которые фиг предусмотришь. И по моему собственному опыту (который насчитывает уже 25 лет), если использовать что-нибудь типа классическое, типа библиотеки javassist — это вообще лучший способ увязнуть в разработке если не навечно, то надолго, и результат будет не то что бы удовлетворительный. Оно всё древнее, следовательно, весьма поганенькое в плане юзабилити, а соответствующий пропозал для современного API кодогенерации всё ещё остаётся на уровне пропозала, и доберётся до продакшена в какой-нибудь Java 30. Что-то больно долго ждать…

Даже если по(д)смотреть, как этсамая кодогенерация реализована на уровне того же Spark SQL, то там авторы явно испытывают такое же отвращение к классической Java кодогенерации. Но их решение ИМХО ещё менее вменяемое: в коде на Scala генерируется исходный текст кода на Java, и пропускается через JANINO — игрушечный компилятор в байткод. Быстрый, но с поддержкой Java на уровне 7 версии, и то крайне кривенькой и ущербной. И вот эти скомпилированные классы подгружаются в текущую JVM, откуда и дёргаются. Дикое извращение, просто потому что так всё равно проще и предсказуемее, чем по классике писать инструкции в байткод напрямую.

Поэтому мы не будем использовать кодогенерацию ни в каком виде.

Мы выберем этакий средний путь: интерпретатор, но не по AST, а по предварительно подготовленной объектной модели. По производительности он будет лишь чуть-чуть медленнее честной кодогенерации (потому что отлично JIT-ится), зато по сложности имплементации куда проще. Тем более, что у нас уже есть опыт: выражения у нас уже вычисляются простенькой стековой виртуальной машиной, и мы спокойно можем добавить ещё одну, но заточенную уже не для формул, а для императивщины.

Вопрос тут в чём — насколько обширное подмножество языка будет поддерживаться в пользовательских функциях. Императивщина (сиречь LET/IF/LOOP) явно нужна вся, а кроме неё?

Ну, начнём, а там видно будет.

Для начала, определим синтаксис для CREATE FUNCTION в ANTLR (парсер у нас на нём):

create_func  : ( K_CREATE ( S_OR K_REPLACE )? )? K_FUNCTION func ( S_OPEN_PAR proc_param ( S_COMMA proc_param )* S_CLOSE_PAR )?   K_RECORD?   K_AS? ( K_RETURN? expression | K_BEGIN func_stmts K_END K_FUNCTION? )  ;

В переводе на чуть более человекообразный, это у нас

[CREATE [OR REPLACE]] FUNCTION function_name[([parameters])] [RECORD] [AS]     RETURN expression; [CREATE [OR REPLACE]] FUNCTION function_name[([parameters])] [RECORD] [AS] BEGIN     control_flow_statements... END [FUNCTION];

То есть, функцию можно определить либо как именованную формулу, которая делает RETURN результата выражения, в котором используется переменные, созданные из формальных параметров, либо как именованный блок императивного кода, заданного между BEGIN и END, и в котором может быть несколько RETURN в разных ветках. Точнее, каждая ветка должна оканчиваться своим RETURN.

То есть, нужен оператор RETURN, имеющий смысла только в контексте тела функции. Какие ещё нюансы надо предусмотреть?

Например, ключевое слово RECORD. Оно означает, что функция будет иметь доступ к полям записи, если используется в контексте SELECT, который итерируется по всем записям датасета (как в примере с polyDescr()). Без этого ключевого слова функция может быть использована и вне контекста SELECT, например, просто в верхнеуровневом коде. Прямо как та самая daysPerMonth() с картинки из шапки поста:

CREATE FUNCTION daysPerMonth(@year, @month) AS BEGIN    IF $month IN [4,6,9,11] THEN RETURN 30; END;    IF $month == 2 THEN       IF ($year % 400 == 0) OR ($year % 4 == 0) AND ($year % 100 <> 0) THEN RETURN 29;       ELSE RETURN 28; END;    END;    RETURN 31; END;

Тут у нас доступа к полям записей нет, и мы можем дёргать эту функцию из любого контекста. А если спецификатор RECORD присутствует, то надо как-то неявно (например, через стек) передавать соответствующую запись и её ключ, чтобы внутри можно было использовать встроенные функции, которые позволяют добраться до ключа текущей записи, или копаться внутри её объекта.

Но вроде пока бы ничего сложного. Попробуем имплементировать?

Попробуем. Но сначала в язык придётся добавить исключения. Потому что с пользовательскими функциями очень логично добавляется такая бизнесовая функциональность, как проверка параметров на валидность, например. Что делать, если в daysPerMonth() передан 25-й месяц? Хорошо было бы выкинуть ошибку, обозвать пользователя нехорошими словами, или сделать что-то подобное.

Окей, нам теперь точно нужна возможность аварийно завершить выполнение с выдачей сообщения об ошибке. Ну или хотя бы возможность написать сообщение в лог.

Прикидываемся Ораклом, и добавляем оператор языка RAISE:

raise_stmt  : K_RAISE T_MSGLVL? expression  ;

Или чуть более развёрнуто,

RAISE [Level] 'Сообщение об исключительной ситуации';

Где Level у нас INFO (сообщение пишется в stdout), WARNING (в stderr), или ERROR (не только отругаться, но сразу аварийно завершиться). Плюс синонимы, такие как DEBUG для INFO и EXCEPTION для ERROR.

Добавим в функцию:

CREATE FUNCTION daysPerMonth(@year, @month) AS BEGIN    IF $month NOT IN RANGE[1,12] THEN RAISE ERROR 'Invalid month #'  || $month; END;    IF $month IN [4,6,9,11] THEN RETURN 30; END;    IF $month == 2 THEN       IF ($year % 400 == 0) OR ($year % 4 == 0) AND ($year % 100 <> 0) THEN RETURN 29;       ELSE RETURN 28; END;    END;    RETURN 31; END;

Итого, для пользовательских функций у нас получается необходимым и достаточным следующий список операторов:

enum Statement {   LET, // — определить переменную,   LOOP, // — итератор по массиву,   IF, // — ветвление,   RETURN, // — возврат значения (новый),   RAISE; // — выброс исключения (новый). }

Всего-то пять. Это совсем немного, и соответствующий стековый (и, понятное дело, рекурсивный) конечный автомат получается весьма компактным:

private static class CallContext {     private final Object key;     private final DataRecord<?> rec;      boolean returnReached = false;     Object returnValue = null;      public CallContext(Object key, DataRecord<?> rec) {         this.key = key;         this.rec = rec;     }      void eval(List<StatementItem> items, VariablesContext vc) {         for (StatementItem fi : items) {             if (returnReached) {                 return;             }              switch (fi.statement) {                 case RETURN: {                     returnValue = Expressions.eval(key, rec, fi.expression, vc);                     returnReached = true;                     return;                 }                 case LET: {                     vc.put(fi.control, Expressions.eval(key, rec, fi.expression, vc));                     break;                 }                 case IF: {                     if (Expressions.bool(key, rec, fi.expression, vc)) {                         eval(fi.mainBranch, vc);                     } else {                         if (fi.elseBranch != null) {                             eval(fi.elseBranch, vc);                         }                     }                     break;                 }                 case LOOP: {                     Object expr = Expressions.eval(key, rec, fi.expression, vc);                     boolean loop = expr != null;                      Object[] loopValues = null;                     if (loop) {                         loopValues = new ArrayWrap(expr).data();                          loop = loopValues.length > 0;                     }                      if (loop) {                         VariablesContext vvc = new VariablesContext(vc);                         for (Object loopValue : loopValues) {                             if (returnReached) {                                 return;                             }                              vvc.put(fi.control, loopValue);                             eval(fi.mainBranch, vvc);                         }                     } else {                         if (fi.elseBranch != null) {                             eval(fi.elseBranch, vc);                         }                     }                     break;                 }                 case RAISE: {                     Object msg = Expressions.eval(key, rec, fi.expression, vc);                      switch (MsgLvl.get(fi.control)) {                         case INFO -> System.out.println(msg);                         case WARNING -> System.err.println(msg);                         default -> {                             returnReached = true;                             throw new RaiseException(String.valueOf(msg));                         }                     }                     break;                 }             }         }     } }

Что тут происходит?

Всё просто. В экземпляр CallContext при его создании может попасть запись датасета вместе со своим ключом (в случае функции, определённой со спецификатором RECORD), либо null-ы в обоих этих полях, если функция глобального контекста.

Поле returnReached нужно для того, чтобы в вызывающей логике можно было определить, что надо либо возвращать значение, которое было ранее положено в returnValue (по умолчанию null), либо выкидывать исключение, потому операторы закончились, а в соответствующей ветке так и не было встречено RETURN.

Ну, а стек операторов и контекст переменных текущего вызова передаются прямиком в eval(), который и проходит по нему от начала до конца. (Вообще говоря, стек операторов передавать на каждый вызов несколько избыточно — функция у нас штука иммутабельная, и от вызова к вызову не будет меняться. Но это такой задел на будущее на самом деле. А то мало ли вдруг захочется кешировать результаты, и что там ещё обычно делают SQL движки в целях оптимизации 🙂

«Экземпляр оператора» определён в общем виде следующим образом:

public static class StatementItem implements Serializable {     final Statement statement;     final String control;     final List<Expressions.ExprItem<?>> expression;     final List<StatementItem> mainBranch;     final List<StatementItem> elseBranch;      private StatementItem(Statement statement, String control, List<Expressions.ExprItem<?>> expression, List<StatementItem> mainBranch, List<StatementItem> elseBranch) {         this.statement = statement;         this.control = control;         this.expression = expression;         this.mainBranch = mainBranch;         this.elseBranch = elseBranch;     }      @Override     public String toString() {         return statement.name() + ((control != null) ? " $" + control : "");     } }

Statement это тот самый enum.

В поле control складывается либо имя управляющей переменной для LOOP/LET, либо уровень для RAISE.

В expression попадает управляющее выражение. Оно есть в каждом операторе. Для IF это буль, для LOOP массив, для LET собственно формула переменной, а для RETURN/RAISE выражение результата.

Рекурсивные списки mainBranch и elseBranch имеют смысл только для IF и LOOP (да, в LOOP у нас тоже есть опциональное ELSE, выполняемое, если в итерируемом массиве ноль элементов).

Для каждого из StatementItem можно сделать простенький builder function, равно как и для экземпляра функции целиком, но код там настолько тривиальный, что я его цитировать не буду.

Сама же функция-обёртка наследуется от того же самого объекта, описывающего функции, импортируемые из classpath. Для функции уровня RECORD она выглядит таким образом (а без него ещё проще):

private static class RecordFunction extends Function.WholeRecord<Object, DataRecord<?>> {     protected final String name;     protected final String descr;     protected final ListOrderedMap<String, Param> params;     protected final List<StatementItem> items;     protected final VariablesContext vc;      public RecordFunction(String name, String descr, ListOrderedMap<String, Param> params,                           List<StatementItem> items, VariablesContext vc) {         this.name = name;         this.descr = descr;         this.params = params;         this.items = items;         this.vc = vc;     }      @Override     public String name() {         return name;     }      @Override     public String descr() {         return descr;     }      @Override     public Object call(Deque<Object> args) {         VariablesContext thisCall = new VariablesContext(vc);         Object key = args.pop();         DataRecord<?> rec = (DataRecord<?>) args.pop();         for (int i = 0; i < params.size(); i++) {             Object a = args.pop();             thisCall.put(params.get(i), (a == null) ? params.getValue(i).defaults : a);         }          CallContext cc = new CallContext(key, rec);         cc.eval(items, thisCall);         if (cc.returnReached) {             return cc.returnValue;         }         throw new RuntimeException("Called function " + name + " with no RETURN");     } }

Тут у нас перед тем, как дёрнуть CallContext, из стека извлекаются те самые неявные параметры с записью датасета и её ключом, а также проставляются дефолтные значения для тех параметров, в которые был передан NULL.

После чего он и дёргается. А потом проверяется, а был ли RETURN. И если был, то возвращаем значение. Это универсальная обёртка, и ей вообще пофигу, какой стек операторов пришёл из парсера, какие были аргументы и т.п.

С точки же зрения интерпретатора выражений такая функция по поведению ничем не будет отличаться от того, что импортируется из classpath, и в соответствующем классе, отвечающем за вычисление выражений, тоже ничего менять не нужно.

Ну и последнее из важного. Надо подвязать это дело в самый верхний уровень интерпретатора, который проходит по всем операторам скрипта. Ну, это делается в одной-единственной точке:

Портяночка Java кода

private void createFunction(TDL.Create_funcContext ctx) {     String funcName = resolveName(ctx.func().L_IDENTIFIER());      if (Functions.FUNCTIONS.containsKey(funcName)) {         throw new InvalidConfigurationException("Attempt to CREATE FUNCTION which overrides pluggable \"" + funcName + "\"");     }      if ((ctx.K_REPLACE() == null) && library.functions.containsKey(funcName)) {         throw new InvalidConfigurationException("FUNCTION " + funcName + " has already been defined. Offending definition at line " + ctx.K_CREATE().getSymbol().getLine());     }      boolean recordLevel = ctx.K_RECORD() != null;     List<TDLFunction.StatementItem> items;     if (ctx.K_BEGIN() == null) {         items = List.of(TDLFunction.funcReturn(expression(ctx.expression().children, recordLevel ? ExpressionRules.RECORD : ExpressionRules.LOOSE)));     } else {         items = funcStatements(ctx.func_stmts().func_stmt(), recordLevel ? ExpressionRules.RECORD : ExpressionRules.LOOSE);     }      TDLFunction.Builder func = TDLFunction.builder(funcName, items, variables);     buildParams(ctx.proc_param(), func);      library.functions.put(funcName, recordLevel ? func.recordLevel() : func.loose()); }  private List<TDLFunction.StatementItem> funcStatements(List<TDL.Func_stmtContext> stmts, ExpressionRules rules) {     List<TDLFunction.StatementItem> items = new ArrayList<>();      for (TDL.Func_stmtContext funcStmt : stmts) {         if (funcStmt.let_func() != null) {             items.add(TDLFunction.funcLet(resolveName(funcStmt.let_func().var_name().L_IDENTIFIER()),                     expression(funcStmt.let_func().expression().children, rules)             ));         }         if (funcStmt.if_func() != null) {             items.add(TDLFunction.funcIf(expression(funcStmt.if_func().expression().children, rules),                     funcStatements(funcStmt.if_func().func_stmts(0).func_stmt(), rules),                     (funcStmt.if_func().func_stmts(1) != null)                             ? funcStatements(funcStmt.if_func().func_stmts(1).func_stmt(), rules)                             : null             ));         }         if (funcStmt.loop_func() != null) {             items.add(TDLFunction.funcLoop(resolveName(funcStmt.loop_func().var_name().L_IDENTIFIER()),                     expression(funcStmt.loop_func().expression().children, rules),                     funcStatements(funcStmt.loop_func().func_stmts(0).func_stmt(), rules),                     (funcStmt.loop_func().func_stmts(1) != null)                             ? funcStatements(funcStmt.loop_func().func_stmts(1).func_stmt(), rules)                             : null             ));         }         if (funcStmt.return_func() != null) {             items.add(TDLFunction.funcReturn(expression(funcStmt.return_func().expression().children, rules)));         }         if (funcStmt.raise_stmt() != null) {             String lvl = (funcStmt.raise_stmt().T_MSGLVL() != null) ? funcStmt.raise_stmt().T_MSGLVL().getText() : null;             items.add(TDLFunction.raise(lvl, expression(funcStmt.raise_stmt().expression().children, rules)));         }     }      return items; }

Тут у нас сначала происходит проверка на переопределение функции из classpath (чего мы не можем просто так допустить), а затем на наличие OR REPLACE (если надо сделать замену). После чего в засисимости от спецификатора RECORD функция билдится с соответствующими ExpressionRules, и используется билдер для простого RETURN, или сложного тела в BEGIN/END.

(Функция funcStatements() выглядит как кусок хтонической жести, но это издержки используемого парсера ANTLR, с ним такое write only безобразие вполне в порядке нормы.)

Пользовательские функции помещаются в тот же самый объект Library уровня скрипта, который мы когда-то ранее завели для хранения пользовательских процедур. Хороший пример заранее продуманной расширяемости.

Для полноты картины в то же самое место, которое делает DROP PROCEDURE, можно ещё и DROP FUNCTION добавить, ведь там всё ровно такое же — и удаляется пользовательская функция из той же самой Library.

Ну и во всех остальных слоях движка, таких как REPL, придётся добавлять соответствующие маленькие кусочки кода, благо, большая часть из них это просто копи-пейст того, что было добавлено для процедур с заменой CREATE/DROP PROCEDURE на CREATE/DROP FUNCTION. См. предыдущие статьи цикла.

Короче. Всего-то понадобилось порядка 300 с чем-то строк кода, и наш маленький диалект SQL стал не только ещё чуть более взрослым, но и в разы более могучим с точки зрения функциональности, доступной конечным пользователям.

Но тут будет ещё чем заняться когда-нибудь в будущем, поэтому stay tuned!

Исходники: https://github.com/PastorGL/datacooker-etl
Официальная группа в телеге: https://t.me/data_cooker_etl

Замечание для странных людей, которые триггерятся от слова «грант» в тегах

Да, разработка описываемого проекта не могла начаться без привлечения грантового финансирования. Также да, данная публикация действительно идёт в официальную отчётность по гранту, как и все остальные статьи в серии. И нет, вас абсолютно не касаются другие его условия, а код выложен в публичный доступ AS IS.


ссылка на оригинал статьи https://habr.com/ru/articles/915964/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *