Разработка пользовательских агрегатных функций для аналитики в MySQL

от автора

Привет, Хабр!

Вы когда‑нибудь писали аналитические запросы в MySQL и понимали, что встроенных функций вам не хватает? Хотите посчитать медиану зарплат? 99-й процентиль времени ответа запросов? Собрать JSON‑массив прямо в базе данных?

В MySQL нет MEDIAN(), PERCENTILE_CONT() и нормального способа объединить данные в JSON. Всё приходится делать через костыли.

Решение? Написать собственную агрегатную функцию на C++, которая будет работать так же, как SUM() и AVG(), но делать то, что вам реально нужно.

Почему UDF на C++?

В MySQL есть несколько способов расширять возможности:

  • Хранимые процедуры (SQL) → медленные, не работают с агрегатами.

  • Плагины (C++) → сложно писать, требует компиляции.

  • UDF (C++) → быстрее SQL, работает как встроенная функция, но требует компиляции.

Поэтому если нужна быстрая кастомная функция типа SUM() или AVG(), UDF — лучший вариант.

Почему стандартные агрегатные функции MySQL не всегда работают?

Допустим, есть таблица employees:

CREATE TABLE employees (     id INT AUTO_INCREMENT PRIMARY KEY,      department VARCHAR(50),      salary DOUBLE );  INSERT INTO employees (department, salary) VALUES ('IT', 100000), ('IT', 120000), ('IT', 110000),  ('HR', 50000), ('HR', 60000), ('HR', 55000); 

Мы хотим посчитать медианную зарплату в каждом отделе.

Среднее (AVG()) не всегда даёт правильную картину. Например:

SELECT department, AVG(salary) FROM employees GROUP BY department;

Допустим, есть три зарплаты: 100k, 120k, 5M, то AVG(salary) = 1.74M, но реальная медиана = 120k.

Можно попробовать SQL‑хак с GROUP_CONCAT():

SELECT department,         SUBSTRING_INDEX(SUBSTRING_INDEX(GROUP_CONCAT(salary ORDER BY salary), ',', COUNT(*)/2), ',', -1) AS median FROM employees GROUP BY department;

Но этот запрос сломается, если зарплат много. GROUP_CONCAT() имеет лимит (group_concat_max_len), а если данных миллионы строк, MySQL начнёт тормозить.

Что делать? Реализовать нормальную агрегатную функцию median().

Как MySQL выполняет агрегатные функции?

Любая агрегатная функция (SUM(), AVG(), COUNT()) в MySQL работает по трём этапам:

  1. Создаёт объект для хранения промежуточных данных (SUM_INIT()).

  2. Добавляет в него каждое значение (SUM_ADD()).

  3. Вычисляет и возвращает результат (SUM_FINAL()).

Для SUM(salary) MySQL держит переменную, в которую суммирует значения. Наша задача — сделать то же самое, но для медианы, JSON‑агрегации и квантилей.

Разработка UDF median() на C++

Создадим файл udf_median.cpp.

#include <mysql/mysql.h> #include <vector> #include <algorithm>  // Структура для хранения промежуточных значений struct MedianCtx {     std::vector<double> values; };  // Инициализация UDF extern "C" my_bool median_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {     if (args->arg_count != 1 || args->arg_type[0] != REAL_RESULT) {         strcpy(message, "median() принимает только один аргумент типа DOUBLE");         return 1;     }      initid->ptr = (char*) new MedianCtx();     return 0; }  // Добавление нового значения в группу extern "C" void median_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {     MedianCtx *ctx = (MedianCtx*) initid->ptr;     ctx->values.push_back(*(double*) args->args[0]); }  // Вычисление медианы extern "C" double median(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {     MedianCtx *ctx = (MedianCtx*) initid->ptr;     if (ctx->values.empty()) {         *is_null = 1;         return 0;     }      std::sort(ctx->values.begin(), ctx->values.end());     size_t n = ctx->values.size();     return (n % 2 == 0) ? (ctx->values[n/2 - 1] + ctx->values[n/2]) / 2 : ctx->values[n/2]; }  // Очистка памяти extern "C" void median_deinit(UDF_INIT *initid) {     delete (MedianCtx*) initid->ptr; }

median_init() создаёт объект для хранения значений. median_add() добавляет в массив новые числа. median() сортирует массив и возвращает медиану. median_deinit() чистит память, чтобы не было утечек.

Компиляция и установка UDF в MySQL

g++ -shared -o udf_median.so -fPIC udf_median.cpp `mysql_config --include` sudo cp udf_median.so /usr/lib/mysql/plugin/

Регистрируем функцию:

CREATE AGGREGATE FUNCTION median RETURNS REAL SONAME 'udf_median.so';

Теперь можно её использовать.

Ещё три примера использования UDF в MySQL

json_agg(): агрегируем данные в JSON

В MySQL нет аналога JSON_AGG(), который есть в PostgreSQL. Нжно собрать JSON‑массив значений, но GROUP_CONCAT() не поддерживает JSON‑кодирование.

Создаём файл udf_json_agg.cpp:

#include <mysql/mysql.h> #include <vector> #include <string> #include <sstream>  // Контекст для хранения JSON-данных struct JsonAggCtx {     std::vector<std::string> values; };  // Инициализация UDF extern "C" my_bool json_agg_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {     if (args->arg_count != 1 || args->arg_type[0] != STRING_RESULT) {         strcpy(message, "json_agg() принимает только один аргумент типа STRING");         return 1;     }     initid->ptr = (char*) new JsonAggCtx();     return 0; }  // Добавление нового значения extern "C" void json_agg_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {     if (args->args[0] == NULL) return;  // Пропускаем NULL-значения     JsonAggCtx *ctx = (JsonAggCtx*) initid->ptr;     ctx->values.push_back(std::string(args->args[0])); }  // Возвращаем JSON extern "C" char* json_agg(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error) {     JsonAggCtx *ctx = (JsonAggCtx*) initid->ptr;     std::ostringstream json;     json << "[";     for (size_t i = 0; i < ctx->values.size(); ++i) {         if (i > 0) json << ",";         json << "\"" << ctx->values[i] << "\"";     }     json << "]";     std::string json_str = json.str();     *length = json_str.size();      char* res = strdup(json_str.c_str());     initid->ptr = (char*) res;  // Сохраняем для освобождения     return res; }  // Очистка памяти extern "C" void json_agg_deinit(UDF_INIT *initid) {     free(initid->ptr);  // Очищаем память после использования     delete (JsonAggCtx*) initid->ptr; }

Теперь можно делать вот так:

SELECT order_id, json_agg(product_name) FROM orders GROUP BY order_id;

И получить нормальный JSON:

[     {"order_id": 1, "products": ["Laptop", "Mouse", "Keyboard"]},     {"order_id": 2, "products": ["Phone", "Charger"]} ]

Теперь можно работать с JSON‑функциями MySQL (JSON_EXTRACT(), JSON_CONTAINS()).

percentile(): вычисляем 95-й и 99-й процентиль

В аналитике важно знать 90-й, 95-й, 99-й процентиль. Если средний запрос выполняется за 100 мс, но 99-й процентиль = 2 секунды → значит, у 1% пользователей сайт жутко тормозит. Для метрик SLA важно следить не за средним временем ответа, а за худшими 1–5%. В PostgreSQL есть PERCENTILE_CONT(), в MySQL — нет.

Напишем UDF percentile(), который работает с GROUP BY:

#include <mysql/mysql.h> #include <vector> #include <algorithm>  // Контекст для хранения значений struct PercentileCtx {     std::vector<double> values; };  // Инициализация UDF extern "C" my_bool percentile_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {     if (args->arg_count != 2 || args->arg_type[0] != REAL_RESULT || args->arg_type[1] != REAL_RESULT) {         strcpy(message, "percentile() принимает два аргумента: (double, double)");         return 1;     }     initid->ptr = (char*) new PercentileCtx();     return 0; }  // Добавляем новое значение в массив extern "C" void percentile_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {     if (args->args[1] == NULL) return;  // Пропускаем NULL-значения     PercentileCtx *ctx = (PercentileCtx*) initid->ptr;     ctx->values.push_back(*(double*) args->args[1]); }  // Вычисляем процентиль extern "C" double percentile(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {     PercentileCtx *ctx = (PercentileCtx*) initid->ptr;     if (ctx->values.empty()) {         *is_null = 1;         return 0;     }      double p = *(double*) args->args[0]; // Процентиль (например, 0.95)     if (ctx->values.size() < 2) return ctx->values[0]; // Не сортируем 1 элемент      std::sort(ctx->values.begin(), ctx->values.end());     size_t index = (size_t)(p * ctx->values.size());     return ctx->values[std::min(index, ctx->values.size() - 1)]; }  // Очистка памяти extern "C" void percentile_deinit(UDF_INIT *initid) {     delete (PercentileCtx*) initid->ptr; }

Теперь можно делать:

SELECT percentile(0.95, response_time) FROM api_logs;

И получить 95-й процентиль времени ответа API.

running_avg()

Если у вас есть данные по временным рядам (например, средняя температура за день), нужно усреднять данные за N последних дней.

Напишем UDF running_avg(), который считает среднее по скользящему окну:

#include <mysql/mysql.h> #include <deque>  // Контекст для хранения скользящего среднего struct RunningAvgCtx {     std::deque<double> window;     double sum = 0;     size_t window_size; };  // Инициализация UDF extern "C" my_bool running_avg_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {     if (args->arg_count != 2 || args->arg_type[0] != REAL_RESULT || args->arg_type[1] != INT_RESULT) {         strcpy(message, "running_avg() принимает два аргумента: (double, int)");         return 1;     }     initid->ptr = (char*) new RunningAvgCtx();     return 0; }  // Добавляем значение extern "C" void running_avg_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {     RunningAvgCtx *ctx = (RunningAvgCtx*) initid->ptr;     double value = *(double*) args->args[0];     size_t window_size = *(long long*) args->args[1];      if (window_size == 0) return;  // Защита от деления на 0      ctx->window.push_back(value);     ctx->sum += value;      if (ctx->window.size() > window_size) {         ctx->sum -= ctx->window.front();         ctx->window.pop_front();     } }  // Возвращаем среднее extern "C" double running_avg(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) {     RunningAvgCtx *ctx = (RunningAvgCtx*) initid->ptr;     return ctx->window.empty() ? 0 : ctx->sum / ctx->window.size(); }  // Очистка памяти extern "C" void running_avg_deinit(UDF_INIT *initid) {     delete (RunningAvgCtx*) initid->ptr; }

Теперь можно делать:

SELECT date, running_avg(temperature, 7) OVER (ORDER BY date) FROM weather; 

И получать усреднённые данные по 7 дням.


Всем, кому интересен системный анализ, рекомендуем обратить внимание на открытые уроки, которые пройдут в Otus в марте:


ссылка на оригинал статьи https://habr.com/ru/articles/886424/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *