Сегментация клиентов методом K-Means на стороне БД

от автора

Привет, Хабр!

Сегодня мы рассмотрим, как заставить PostgreSQL самостоятельно крутить K-Means для сегментации клиентов, не вытаскивая данные наружу. Пройдемся по циклу: нормализуем фичи в materialized view, напишем функцию PL/PythonU, которая дергает scikit-learn, сохраняем cluster_id обратно в таблицу и закрываем гештальт отчётом «доход по кластеру» чистым SQL.

Схема данных

Допустим, есть транзакции и базовая информация о клиентах:

CREATE TABLE public.customers (     customer_id      bigint PRIMARY KEY,     registered_at    timestamptz,     email            text UNIQUE );  CREATE TABLE public.orders (     order_id         bigint PRIMARY KEY,     customer_id      bigint REFERENCES public.customers,     order_dt         timestamptz NOT NULL,     order_amount     numeric(12,2) NOT NULL );

Генерация фичей и нормализация в Materialized View

Нужны числовые признаки на одного клиента: orders_cnt, days_since_last, mean_amount, total_amount. Сразу делаем z-score, чтобы K-Means не страдал от разных масштабов. Всё в одном запросе, а результат кешируем материализованным представлением:

CREATE MATERIALIZED VIEW ds.mv_customer_features AS WITH stats AS (     SELECT         avg(orders_cnt)::numeric  AS avg_orders_cnt,         stddev_samp(orders_cnt)   AS sd_orders_cnt,         avg(days_since_last)      AS avg_days_last,         stddev_samp(days_since_last) AS sd_days_last,         avg(mean_amount)          AS avg_mean_amount,         stddev_samp(mean_amount)  AS sd_mean_amount,         avg(total_amount)         AS avg_total_amount,         stddev_samp(total_amount) AS sd_total_amount     FROM (         SELECT             c.customer_id,             COUNT(o.*)                      AS orders_cnt,             EXTRACT(EPOCH FROM (now() - MAX(o.order_dt))) / 86400 AS days_since_last,             AVG(o.order_amount)             AS mean_amount,             SUM(o.order_amount)             AS total_amount         FROM public.customers c         LEFT JOIN public.orders o USING (customer_id)         GROUP BY c.customer_id     ) sub ), base AS (     SELECT         c.customer_id,         COUNT(o.*)                      AS orders_cnt,         EXTRACT(EPOCH FROM (now() - MAX(o.order_dt))) / 86400 AS days_since_last,         AVG(o.order_amount)             AS mean_amount,         SUM(o.order_amount)             AS total_amount     FROM public.customers c     LEFT JOIN public.orders o USING (customer_id)     GROUP BY c.customer_id ), z AS (     SELECT         b.customer_id,         (b.orders_cnt    - s.avg_orders_cnt)   / NULLIF(s.sd_orders_cnt,0)   AS z_orders_cnt,         (b.days_since_last - s.avg_days_last)  / NULLIF(s.sd_days_last,0)    AS z_days_last,         (b.mean_amount   - s.avg_mean_amount)  / NULLIF(s.sd_mean_amount,0)  AS z_mean_amount,         (b.total_amount  - s.avg_total_amount) / NULLIF(s.sd_total_amount,0) AS z_total_amount     FROM base b CROSS JOIN stats s ) SELECT * FROM z;

Материализованный вид хорош тем, что его можно освежать по расписанию (REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;) и он даёт индексы, если понадобится. Под капотом всё стандартный SQL, никаких костылей.

Подключаем PL/PythonU

CREATE EXTENSION IF NOT EXISTS plpython3u;

Права выдаём только чтение на ds.mv_customer_features и запись на public.customers.cluster_id.

Хранимая функция K-Means

Функция получает желаемое число кластеров (k), обучает K-Means на нормированных фичах, сохраняет модель в JSON (для истории) и пишет номер кластера клиенту в таблицу.

CREATE OR REPLACE FUNCTION ds.build_customer_clusters(k int DEFAULT 5) RETURNS void LANGUAGE plpython3u SECURITY DEFINER AS $$ import json from sklearn.cluster import KMeans from sklearn.exceptions import ConvergenceWarning import warnings  plpy.execute("SET search_path TO ds, public")  # 1. Забираем данные rows = plpy.execute("""     SELECT customer_id,            ARRAY[z_orders_cnt, z_days_last, z_mean_amount, z_total_amount] AS f     FROM ds.mv_customer_features     WHERE z_orders_cnt IS NOT NULL """)  if len(rows) < k:     plpy.error(f"Not enough data points ({len(rows)}) for k={k}")  cust_ids = [r['customer_id'] for r in rows] X = [r['f'] for r in rows]  # 2. Обучаем модель warnings.filterwarnings("ignore", category=ConvergenceWarning) model = KMeans(n_clusters=k, n_init='auto', random_state=42) model.fit(X) labels = model.labels_  # 3. Записываем кластера tuples = [{'customer_id': cid, 'cluster_id': int(lbl)} for cid, lbl in zip(cust_ids, labels)] plpy.execute("CREATE TEMP TABLE _tmp_cluster (customer_id bigint, cluster_id int) ON COMMIT DROP") plpy.execute("INSERT INTO _tmp_cluster VALUES " +               ", ".join(f"({t['customer_id']}, {t['cluster_id']})" for t in tuples))  plpy.execute("""     UPDATE public.customers c     SET cluster_id = t.cluster_id     FROM _tmp_cluster t     WHERE t.customer_id = c.customer_id """)  # 4. Сериализуем модель (опционально) plpy.execute("""     INSERT INTO ds.model_registry(model_name, trained_at, params, inertia)     VALUES ('customer_kmeans', now(), $1, $2) """, [json.dumps(model.get_params()), float(model.inertia_)]) $$;

SECURITY DEFINER исполняется с правами владельца, пользователи не получат лишние привилегии. Темп-таблица для массового апдейта быстрее, чем UPDATE … FROM (VALUES …) на тысячи строк. random_state фиксируем, чтобы результаты воспроизводились.

Запуск и расписание

Разово:

SELECT ds.build_customer_clusters(6); REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;

Еще удобно делать REFRESH и перестройку модели ночью cron-джобой или pg_cron:

SELECT cron.schedule('0 3 * * *', $$REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;                        SELECT ds.build_customer_clusters(6);$$);

Отчёт «доход по кластеру»

Никакого Python — только SQL:

WITH rev AS (     SELECT         c.cluster_id,         SUM(o.order_amount) AS revenue     FROM public.customers c     JOIN public.orders o USING (customer_id)     GROUP BY c.cluster_id ) SELECT     cluster_id,     revenue,     ROUND(revenue * 100.0 / SUM(revenue) OVER (), 2) AS revenue_pct FROM rev ORDER BY revenue DESC;

Результат сразу готов к дашборду: видно долю каждого сегмента в общей выручке.

Итог

Если у вас уже есть опыт сегментации на стороне БД — делитесь в комментариях: как масштабировали, какие проблемы ловили, где K-Means не зашёл. Чем больше примеров, тем полезнее статья для всех.

Погрузитесь в процесс разработки ПО с нуля: научитесь учитывать цели бизнеса и формулировать технические требования к продукту на базовом курсе «Системный аналитик».

Чтобы оставаться в курсе актуальных технологий и трендов, подписывайтесь на Telegram-канал OTUS.


ссылка на оригинал статьи https://habr.com/ru/articles/930506/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *