Привет, Хабр!
Сегодня мы рассмотрим, как заставить PostgreSQL самостоятельно крутить K-Means для сегментации клиентов, не вытаскивая данные наружу. Пройдемся по циклу: нормализуем фичи в materialized view, напишем функцию PL/PythonU, которая дергает scikit-learn, сохраняем cluster_id обратно в таблицу и закрываем гештальт отчётом «доход по кластеру» чистым SQL.
Схема данных
Допустим, есть транзакции и базовая информация о клиентах:
CREATE TABLE public.customers ( customer_id bigint PRIMARY KEY, registered_at timestamptz, email text UNIQUE ); CREATE TABLE public.orders ( order_id bigint PRIMARY KEY, customer_id bigint REFERENCES public.customers, order_dt timestamptz NOT NULL, order_amount numeric(12,2) NOT NULL );
Генерация фичей и нормализация в Materialized View
Нужны числовые признаки на одного клиента: orders_cnt, days_since_last, mean_amount, total_amount. Сразу делаем z-score, чтобы K-Means не страдал от разных масштабов. Всё в одном запросе, а результат кешируем материализованным представлением:
CREATE MATERIALIZED VIEW ds.mv_customer_features AS WITH stats AS ( SELECT avg(orders_cnt)::numeric AS avg_orders_cnt, stddev_samp(orders_cnt) AS sd_orders_cnt, avg(days_since_last) AS avg_days_last, stddev_samp(days_since_last) AS sd_days_last, avg(mean_amount) AS avg_mean_amount, stddev_samp(mean_amount) AS sd_mean_amount, avg(total_amount) AS avg_total_amount, stddev_samp(total_amount) AS sd_total_amount FROM ( SELECT c.customer_id, COUNT(o.*) AS orders_cnt, EXTRACT(EPOCH FROM (now() - MAX(o.order_dt))) / 86400 AS days_since_last, AVG(o.order_amount) AS mean_amount, SUM(o.order_amount) AS total_amount FROM public.customers c LEFT JOIN public.orders o USING (customer_id) GROUP BY c.customer_id ) sub ), base AS ( SELECT c.customer_id, COUNT(o.*) AS orders_cnt, EXTRACT(EPOCH FROM (now() - MAX(o.order_dt))) / 86400 AS days_since_last, AVG(o.order_amount) AS mean_amount, SUM(o.order_amount) AS total_amount FROM public.customers c LEFT JOIN public.orders o USING (customer_id) GROUP BY c.customer_id ), z AS ( SELECT b.customer_id, (b.orders_cnt - s.avg_orders_cnt) / NULLIF(s.sd_orders_cnt,0) AS z_orders_cnt, (b.days_since_last - s.avg_days_last) / NULLIF(s.sd_days_last,0) AS z_days_last, (b.mean_amount - s.avg_mean_amount) / NULLIF(s.sd_mean_amount,0) AS z_mean_amount, (b.total_amount - s.avg_total_amount) / NULLIF(s.sd_total_amount,0) AS z_total_amount FROM base b CROSS JOIN stats s ) SELECT * FROM z;
Материализованный вид хорош тем, что его можно освежать по расписанию (REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;) и он даёт индексы, если понадобится. Под капотом всё стандартный SQL, никаких костылей.
Подключаем PL/PythonU
CREATE EXTENSION IF NOT EXISTS plpython3u;
Права выдаём только чтение на ds.mv_customer_features и запись на public.customers.cluster_id.
Хранимая функция K-Means
Функция получает желаемое число кластеров (k), обучает K-Means на нормированных фичах, сохраняет модель в JSON (для истории) и пишет номер кластера клиенту в таблицу.
CREATE OR REPLACE FUNCTION ds.build_customer_clusters(k int DEFAULT 5) RETURNS void LANGUAGE plpython3u SECURITY DEFINER AS $$ import json from sklearn.cluster import KMeans from sklearn.exceptions import ConvergenceWarning import warnings plpy.execute("SET search_path TO ds, public") # 1. Забираем данные rows = plpy.execute(""" SELECT customer_id, ARRAY[z_orders_cnt, z_days_last, z_mean_amount, z_total_amount] AS f FROM ds.mv_customer_features WHERE z_orders_cnt IS NOT NULL """) if len(rows) < k: plpy.error(f"Not enough data points ({len(rows)}) for k={k}") cust_ids = [r['customer_id'] for r in rows] X = [r['f'] for r in rows] # 2. Обучаем модель warnings.filterwarnings("ignore", category=ConvergenceWarning) model = KMeans(n_clusters=k, n_init='auto', random_state=42) model.fit(X) labels = model.labels_ # 3. Записываем кластера tuples = [{'customer_id': cid, 'cluster_id': int(lbl)} for cid, lbl in zip(cust_ids, labels)] plpy.execute("CREATE TEMP TABLE _tmp_cluster (customer_id bigint, cluster_id int) ON COMMIT DROP") plpy.execute("INSERT INTO _tmp_cluster VALUES " + ", ".join(f"({t['customer_id']}, {t['cluster_id']})" for t in tuples)) plpy.execute(""" UPDATE public.customers c SET cluster_id = t.cluster_id FROM _tmp_cluster t WHERE t.customer_id = c.customer_id """) # 4. Сериализуем модель (опционально) plpy.execute(""" INSERT INTO ds.model_registry(model_name, trained_at, params, inertia) VALUES ('customer_kmeans', now(), $1, $2) """, [json.dumps(model.get_params()), float(model.inertia_)]) $$;
SECURITY DEFINER исполняется с правами владельца, пользователи не получат лишние привилегии. Темп-таблица для массового апдейта быстрее, чем UPDATE … FROM (VALUES …) на тысячи строк. random_state фиксируем, чтобы результаты воспроизводились.
Запуск и расписание
Разово:
SELECT ds.build_customer_clusters(6); REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features;
Еще удобно делать REFRESH и перестройку модели ночью cron-джобой или pg_cron:
SELECT cron.schedule('0 3 * * *', $$REFRESH MATERIALIZED VIEW CONCURRENTLY ds.mv_customer_features; SELECT ds.build_customer_clusters(6);$$);
Отчёт «доход по кластеру»
Никакого Python — только SQL:
WITH rev AS ( SELECT c.cluster_id, SUM(o.order_amount) AS revenue FROM public.customers c JOIN public.orders o USING (customer_id) GROUP BY c.cluster_id ) SELECT cluster_id, revenue, ROUND(revenue * 100.0 / SUM(revenue) OVER (), 2) AS revenue_pct FROM rev ORDER BY revenue DESC;
Результат сразу готов к дашборду: видно долю каждого сегмента в общей выручке.
Итог
Если у вас уже есть опыт сегментации на стороне БД — делитесь в комментариях: как масштабировали, какие проблемы ловили, где K-Means не зашёл. Чем больше примеров, тем полезнее статья для всех.
Погрузитесь в процесс разработки ПО с нуля: научитесь учитывать цели бизнеса и формулировать технические требования к продукту на базовом курсе «Системный аналитик».
Чтобы оставаться в курсе актуальных технологий и трендов, подписывайтесь на Telegram-канал OTUS.
ссылка на оригинал статьи https://habr.com/ru/articles/930506/
Добавить комментарий