Привет, Хабр!
Вы когда‑нибудь писали аналитические запросы в MySQL и понимали, что встроенных функций вам не хватает? Хотите посчитать медиану зарплат? 99-й процентиль времени ответа запросов? Собрать JSON‑массив прямо в базе данных?
В MySQL нет MEDIAN(), PERCENTILE_CONT() и нормального способа объединить данные в JSON. Всё приходится делать через костыли.
Решение? Написать собственную агрегатную функцию на C++, которая будет работать так же, как SUM() и AVG(), но делать то, что вам реально нужно.
Почему UDF на C++?
В MySQL есть несколько способов расширять возможности:
-
Хранимые процедуры (SQL) → медленные, не работают с агрегатами.
-
Плагины (C++) → сложно писать, требует компиляции.
-
UDF (C++) → быстрее SQL, работает как встроенная функция, но требует компиляции.
Поэтому если нужна быстрая кастомная функция типа SUM() или AVG(), UDF — лучший вариант.
Почему стандартные агрегатные функции MySQL не всегда работают?
Допустим, есть таблица employees:
CREATE TABLE employees ( id INT AUTO_INCREMENT PRIMARY KEY, department VARCHAR(50), salary DOUBLE ); INSERT INTO employees (department, salary) VALUES ('IT', 100000), ('IT', 120000), ('IT', 110000), ('HR', 50000), ('HR', 60000), ('HR', 55000);
Мы хотим посчитать медианную зарплату в каждом отделе.
Среднее (AVG()) не всегда даёт правильную картину. Например:
SELECT department, AVG(salary) FROM employees GROUP BY department;
Допустим, есть три зарплаты: 100k, 120k, 5M, то AVG(salary) = 1.74M, но реальная медиана = 120k.
Можно попробовать SQL‑хак с GROUP_CONCAT():
SELECT department, SUBSTRING_INDEX(SUBSTRING_INDEX(GROUP_CONCAT(salary ORDER BY salary), ',', COUNT(*)/2), ',', -1) AS median FROM employees GROUP BY department;
Но этот запрос сломается, если зарплат много. GROUP_CONCAT() имеет лимит (group_concat_max_len), а если данных миллионы строк, MySQL начнёт тормозить.
Что делать? Реализовать нормальную агрегатную функцию median().
Как MySQL выполняет агрегатные функции?
Любая агрегатная функция (SUM(), AVG(), COUNT()) в MySQL работает по трём этапам:
-
Создаёт объект для хранения промежуточных данных (
SUM_INIT()). -
Добавляет в него каждое значение (
SUM_ADD()). -
Вычисляет и возвращает результат (
SUM_FINAL()).
Для SUM(salary) MySQL держит переменную, в которую суммирует значения. Наша задача — сделать то же самое, но для медианы, JSON‑агрегации и квантилей.
Разработка UDF median() на C++
Создадим файл udf_median.cpp.
#include <mysql/mysql.h> #include <vector> #include <algorithm> // Структура для хранения промежуточных значений struct MedianCtx { std::vector<double> values; }; // Инициализация UDF extern "C" my_bool median_init(UDF_INIT *initid, UDF_ARGS *args, char *message) { if (args->arg_count != 1 || args->arg_type[0] != REAL_RESULT) { strcpy(message, "median() принимает только один аргумент типа DOUBLE"); return 1; } initid->ptr = (char*) new MedianCtx(); return 0; } // Добавление нового значения в группу extern "C" void median_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) { MedianCtx *ctx = (MedianCtx*) initid->ptr; ctx->values.push_back(*(double*) args->args[0]); } // Вычисление медианы extern "C" double median(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) { MedianCtx *ctx = (MedianCtx*) initid->ptr; if (ctx->values.empty()) { *is_null = 1; return 0; } std::sort(ctx->values.begin(), ctx->values.end()); size_t n = ctx->values.size(); return (n % 2 == 0) ? (ctx->values[n/2 - 1] + ctx->values[n/2]) / 2 : ctx->values[n/2]; } // Очистка памяти extern "C" void median_deinit(UDF_INIT *initid) { delete (MedianCtx*) initid->ptr; }
median_init() создаёт объект для хранения значений. median_add() добавляет в массив новые числа. median() сортирует массив и возвращает медиану. median_deinit() чистит память, чтобы не было утечек.
Компиляция и установка UDF в MySQL
g++ -shared -o udf_median.so -fPIC udf_median.cpp `mysql_config --include` sudo cp udf_median.so /usr/lib/mysql/plugin/
Регистрируем функцию:
CREATE AGGREGATE FUNCTION median RETURNS REAL SONAME 'udf_median.so';
Теперь можно её использовать.
Ещё три примера использования UDF в MySQL
json_agg(): агрегируем данные в JSON
В MySQL нет аналога JSON_AGG(), который есть в PostgreSQL. Нжно собрать JSON‑массив значений, но GROUP_CONCAT() не поддерживает JSON‑кодирование.
Создаём файл udf_json_agg.cpp:
#include <mysql/mysql.h> #include <vector> #include <string> #include <sstream> // Контекст для хранения JSON-данных struct JsonAggCtx { std::vector<std::string> values; }; // Инициализация UDF extern "C" my_bool json_agg_init(UDF_INIT *initid, UDF_ARGS *args, char *message) { if (args->arg_count != 1 || args->arg_type[0] != STRING_RESULT) { strcpy(message, "json_agg() принимает только один аргумент типа STRING"); return 1; } initid->ptr = (char*) new JsonAggCtx(); return 0; } // Добавление нового значения extern "C" void json_agg_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) { if (args->args[0] == NULL) return; // Пропускаем NULL-значения JsonAggCtx *ctx = (JsonAggCtx*) initid->ptr; ctx->values.push_back(std::string(args->args[0])); } // Возвращаем JSON extern "C" char* json_agg(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error) { JsonAggCtx *ctx = (JsonAggCtx*) initid->ptr; std::ostringstream json; json << "["; for (size_t i = 0; i < ctx->values.size(); ++i) { if (i > 0) json << ","; json << "\"" << ctx->values[i] << "\""; } json << "]"; std::string json_str = json.str(); *length = json_str.size(); char* res = strdup(json_str.c_str()); initid->ptr = (char*) res; // Сохраняем для освобождения return res; } // Очистка памяти extern "C" void json_agg_deinit(UDF_INIT *initid) { free(initid->ptr); // Очищаем память после использования delete (JsonAggCtx*) initid->ptr; }
Теперь можно делать вот так:
SELECT order_id, json_agg(product_name) FROM orders GROUP BY order_id;
И получить нормальный JSON:
[ {"order_id": 1, "products": ["Laptop", "Mouse", "Keyboard"]}, {"order_id": 2, "products": ["Phone", "Charger"]} ]
Теперь можно работать с JSON‑функциями MySQL (JSON_EXTRACT(), JSON_CONTAINS()).
percentile(): вычисляем 95-й и 99-й процентиль
В аналитике важно знать 90-й, 95-й, 99-й процентиль. Если средний запрос выполняется за 100 мс, но 99-й процентиль = 2 секунды → значит, у 1% пользователей сайт жутко тормозит. Для метрик SLA важно следить не за средним временем ответа, а за худшими 1–5%. В PostgreSQL есть PERCENTILE_CONT(), в MySQL — нет.
Напишем UDF percentile(), который работает с GROUP BY:
#include <mysql/mysql.h> #include <vector> #include <algorithm> // Контекст для хранения значений struct PercentileCtx { std::vector<double> values; }; // Инициализация UDF extern "C" my_bool percentile_init(UDF_INIT *initid, UDF_ARGS *args, char *message) { if (args->arg_count != 2 || args->arg_type[0] != REAL_RESULT || args->arg_type[1] != REAL_RESULT) { strcpy(message, "percentile() принимает два аргумента: (double, double)"); return 1; } initid->ptr = (char*) new PercentileCtx(); return 0; } // Добавляем новое значение в массив extern "C" void percentile_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) { if (args->args[1] == NULL) return; // Пропускаем NULL-значения PercentileCtx *ctx = (PercentileCtx*) initid->ptr; ctx->values.push_back(*(double*) args->args[1]); } // Вычисляем процентиль extern "C" double percentile(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) { PercentileCtx *ctx = (PercentileCtx*) initid->ptr; if (ctx->values.empty()) { *is_null = 1; return 0; } double p = *(double*) args->args[0]; // Процентиль (например, 0.95) if (ctx->values.size() < 2) return ctx->values[0]; // Не сортируем 1 элемент std::sort(ctx->values.begin(), ctx->values.end()); size_t index = (size_t)(p * ctx->values.size()); return ctx->values[std::min(index, ctx->values.size() - 1)]; } // Очистка памяти extern "C" void percentile_deinit(UDF_INIT *initid) { delete (PercentileCtx*) initid->ptr; }
Теперь можно делать:
SELECT percentile(0.95, response_time) FROM api_logs;
И получить 95-й процентиль времени ответа API.
running_avg()
Если у вас есть данные по временным рядам (например, средняя температура за день), нужно усреднять данные за N последних дней.
Напишем UDF running_avg(), который считает среднее по скользящему окну:
#include <mysql/mysql.h> #include <deque> // Контекст для хранения скользящего среднего struct RunningAvgCtx { std::deque<double> window; double sum = 0; size_t window_size; }; // Инициализация UDF extern "C" my_bool running_avg_init(UDF_INIT *initid, UDF_ARGS *args, char *message) { if (args->arg_count != 2 || args->arg_type[0] != REAL_RESULT || args->arg_type[1] != INT_RESULT) { strcpy(message, "running_avg() принимает два аргумента: (double, int)"); return 1; } initid->ptr = (char*) new RunningAvgCtx(); return 0; } // Добавляем значение extern "C" void running_avg_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) { RunningAvgCtx *ctx = (RunningAvgCtx*) initid->ptr; double value = *(double*) args->args[0]; size_t window_size = *(long long*) args->args[1]; if (window_size == 0) return; // Защита от деления на 0 ctx->window.push_back(value); ctx->sum += value; if (ctx->window.size() > window_size) { ctx->sum -= ctx->window.front(); ctx->window.pop_front(); } } // Возвращаем среднее extern "C" double running_avg(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) { RunningAvgCtx *ctx = (RunningAvgCtx*) initid->ptr; return ctx->window.empty() ? 0 : ctx->sum / ctx->window.size(); } // Очистка памяти extern "C" void running_avg_deinit(UDF_INIT *initid) { delete (RunningAvgCtx*) initid->ptr; }
Теперь можно делать:
SELECT date, running_avg(temperature, 7) OVER (ORDER BY date) FROM weather;
И получать усреднённые данные по 7 дням.
Всем, кому интересен системный анализ, рекомендуем обратить внимание на открытые уроки, которые пройдут в Otus в марте:
-
6 марта. Use Cases: Как улучшить требования к проекту.
Узнать подробнее -
17 марта. Переезд с монолита на микросервисы: когда, зачем и как.
Узнать подробнее
ссылка на оригинал статьи https://habr.com/ru/articles/886424/
Добавить комментарий