Slow Feature Analysis. Разбор метода и реализация на Python с нуля

от автора

Привет, Хабр!

В этой статье я хочу рассказать про метод обучения без учителя — “Анализ медленных признаков” (Slow Feature Analysis), далее SFA. Метод был разработан в 2002 году Лоренцом Вискоттом и Терренсом Сейновски.

SFA можно использовать для выделения стабильных сигналов на фоне шума, такие как отслеживание объектов на видео, трендов цен из финансовых данных, признаков износа по вибрациям оборудования.

SFA — метод изучения инвариантных или медленно изменяющихся признаков на основе векторного входного сигнала. Эту задачу можно свести к минимизации скорости изменения проекций. В статье будет рассмотрен многомерный случай, но принцип легко обобщается на одномерный через вложение Такенса.

Что такое вложение Такенса?

Вложение Такенса — метод реконструкции фазового пространства динамической системы по временному ряду.
Чтобы воспользоваться этим методом нужно определить минимальное необходимое пространство D. Сделать это можно методом ложных ближайших соседей. По теореме Такенса если собладается

D >= 2k + 1, где k истинная размерность аттрактора, то реконструкция будет топологически эквивалентна исходной системе. Далее нам нужно выбрать временную задержку \tau и мы сможем построить из исходного ряда векторы размерности D:

X(t)=(x(t), x(t + τ), x(t + 2τ), ...x(t + (D − 1)τ).

Каждая точка в новом d-мерном пространстве соответствует “окну” длины D из исходного ряда.

Таким образом, даже если у вас одномерный временной ряд, вы можете сначала применить вложение Такенса для восстановления фазового пространства, а затем подать полученный многомерный сигнал на вход SFA для извлечения медленных управляющих параметров системы.

Объяснение алгоритма и реализация его на Python

Пусть у нас есть многомерный временной сигнал X(t) длины T и количеством признаков N такой, что:

X(t) = [x_1(t), x_2(t), x_3(t), ... x_N(t)],

Где x_i(t) значения временного сигнала, а t = 1, 2, …, T.

Для этой задачи будет создан искусственный пример — три различных сигнала:

  • Синус с белым шумом

  • Синус с угасающим розовым шумом

  • Синус с экстремальными значениями и броуновским блужданием.

Примечание: наличие розового шума, броуновского блуждания и выбросов сделано намеренно, чтобы подсветить границы применимости метода. Как будет показано далее на графиках, SFA успешно выделяет общий тренд, но его весовые коэффициенты для зашумленных каналов стремятся к нулю, эффективно отключая “сломанный сигнал”. Это показывает как силу, так и ограничение метода — он не восстанавливает сигнал из шума, а стремится игнорировать источник шума.

В примере будут использоваться исходные три столбца, но для более сложных задач можно создавать степенные, логарифмические и более сложные признаки и их композиции.

series_1

series_2

series_3

0

0.3249

-0.6801

0.2065

1

-0.0972

0.1738

0.7730

2

-0.0553

-0.3769

0.3526

3

-0.1391

0.0540

0.9979

4

0.2736

0.3680

0.7669

Код на Python
import numpy as npimport pandas as pdnp.random.seed(1)pd.set_option('display.float_format', '{:.4f}'.format)data_size = 500# 4 периода синусаx = np.linspace(0, 4 * np.pi, data_size) sin_wave = np.sin(x)# Создание синуса с белым шумомwhite_noise = np.random.normal(0, 0.2, data_size)series_1 = sin_wave + white_noise# Создание синуса угасающим розовым шумомpink_noise = (lambda x: x / np.std(x))(    np.real(        np.fft.ifft(            np.concatenate([                [0],                (1 / np.sqrt(np.abs(np.fft.fftfreq(data_size)[1:]) + 1e-10))                * np.exp(2j * np.pi * np.random.random(data_size - 1))            ])        )    ))decay_envelope = 2.25 * np.exp(-2.25 * x / x.max()) + 0.25pink_noise = pink_noise * decay_envelopeseries_2 = sin_wave + 0.4 * pink_noise# Создание синуса с выбросами и броуновским движениемbrownian = np.cumsum(np.random.normal(0, 0.5, data_size))extreme_values = np.zeros(data_size)outlier_indices = np.random.choice(data_size, size=int(0.5 * data_size), replace=False)extreme_values[outlier_indices] = np.random.uniform(-1, 1, size=len(outlier_indices))series_3 = sin_wave + extreme_values + 0.4 * browniandf = pd.DataFrame({    'series_1': series_1,    'series_2': series_2,    'series_3': series_3})  

Мы хотим найти такие функции g_j(X), чтобы выходные сигналы y_j(t) = g_j(X(t)) изменялись наиболее медленно. Это достигается минимизацией следующего функционала:

Δ_j = ⟨\dot{y}_j(t)^2⟩,

Где \dot{y}_j производная по времени, а треугольные скобки ⟨⟩ означают усреднение по времени. Так как у нас дискретный случай, то производная по времени будет считаться следующим образом:

\dot{y}_j \approx y_j(t+1) - y_j(t).

Для статьи был выбран самый простой способ, но при необходимости можно использовать и центральные разности:

\dot{y}_j \approx \frac {y_j(t + 1) - y_j(t - 1)}{2}.

Как и у многих оптимизационных задач здесь есть ограничения:

  1. Нулевое среднее. Это нужно чтобы исключить неинформативные константные признаки.

\frac{1}{T} \sum_{t=1}^{T} y_j(t) = 0.

  1. Единичная дисперсия. Все признаки нужно привести к одному масштабу.

\frac{1}{T} \sum_{t=1}^{T} (y_j(t))^2 = 1.

  1. Некоррелированность. Это нужно чтобы каждый следующий признак не дублировал информацию.

\frac{1}{T} \sum_{t=1}^{T} y_i(t) \cdot y_j(t) = 0, \quad \text{при } i \neq j.

В настоящий момент данные образуют следующее облако точек:

Чтобы наши данные удовлетворяли этим требованиям мы сделаем следующие действия:

Для центрирования следует вычесть из каждого элемента среднее:

\tilde{x}(t) = x(t) - \overline{x}.

series_1

series_2

series_3

0

0.3142

-0.6037

-1.6258

1

-0.1079

0.2502

-1.0593

2

-0.0660

-0.3006

-1.4797

3

-0.1498

0.1304

-0.8344

4

0.2630

0.4444

-1.0654

Код на Python
df_centr = pd.DataFrame()for column in df.columns:    df_centr[column] = df[column] - df[column].mean()  

Вычисляем ковариационную матрицу. Ковариационная матрица признаков определяется как математическое ожидание внешнего произведения центрированного вектора этих признаков, то получается следующее:

A = ⟨\tilde{x}(t)\tilde{x}^T(t)⟩.

series_1

series_2

series_3

series_1

0.5298

0.4110

-0.2132

series_2

0.4110

0.6339

-0.1706

series_3

-0.2132

-0.1706

0.9572

Код на Python
A = (df_centr.T @ df_centr) / (len(df_centr) - 1)  

или

A = np.cov(df_centr.T)  

Теперь нужно сделать сферизацию, ZCA whitening или, как это еще называют, отбеливание Махаланобиса. Для этого нужно выполнить сингулярное разложение ковариационной матрицы. Получится следующее: UDU^{T}, важно заметить, что для наших целей элементы матрицы D будут в степени -\frac{1}{2}. Полученная формула будет такой:

A_s = UD^{-\frac{1}{2}}U^{T}.

Далее нам нужно умножить:

\tilde{X} = \tilde{X}A_s.

Матрица A_s выполняет преобразование, которое приводит данные к единичной ковариационной матрице ⟨\tilde{x}_s\tilde{x}_s^{T}⟩ = I, где I — единичная матрица.

Код на Python
# Выполняем сингулярное разложение матрицы AU, D, Vt = np.linalg.svd(A)# Создаём диагональную матрицу из обратных квадратных корней сингулярных значенийD_inv_sqrt = np.diag(D ** (-0.5))  # Умножение U @ D_inv_sqrt @ U.T даёт матрицу, которая будет использоваться для сферирования данныхA_s = U @ D_inv_sqrt @ U.T# Применяем сферирование к dfdf_sphered = df_centr @ A_s  

series_1

series_2

series_3

0

0.7398

-1.2908

-1.6972

1

-0.5523

0.4318

-1.1252

2

-0.1557

-0.5195

-1.5904

3

-0.5084

0.2768

-0.9004

4

-0.0018

0.4875

-1.0604

Связаны ли между собой расстояние и отбеливание Махаланобиса?

Если кратко: ZCA-отбеливание превращает расстояние Махаланобиса в обычное Евклидово расстояние.

Расстояние Махаланобиса измеряет удалённость точки x от центра облака данных \mu с учётом формы этого облака (ковариации \Sigma):

D_M(x) = \sqrt{(x - \mu)^T \Sigma^{-1} (x - \mu)}.

ZCA-отбеливание (то самое, которое мы применяем в SFA) преобразует данные по формуле:

x_{\text{white}} = \Sigma^{-1/2} (x - \mu).

Теперь посчитаем обычное Евклидово расстояние от нуля до x_{\text{white}} в отбеленном пространстве:

\|x_{\text{white}}\|^2 = x_{\text{white}}^T x_{\text{white}} = (x - \mu)^T \Sigma^{-1/2} \Sigma^{-1/2} (x - \mu) = (x - \mu)^T \Sigma^{-1} (x - \mu) = D_M^2(x).

Расстояние Махаланобиса в исходном пространстве равно обычному Евклидову расстоянию в отбеленном пространстве.

После отбеливания облако точек выглядит так:

Вычисляем производные сферированных данных:

\dot{x}(t) = \tilde{x}(t+1) - \tilde{x}(t).

series_1

series_2

series_3

0

-1.2920

1.7227

0.5721

1

0.3966

-0.9514

-0.4653

2

-0.3527

0.7964

0.6900

3

0.5065

0.2106

-0.1600

4

-1.4053

1.0828

-0.0462

Примечание: усреднение ⟨⟩ идет по t от 1 до T-1, так как для последнего элемента T нельзя вычислить \dot{x}(T) по формуле x(T+1) - x(T).

Код на Python
df_derivative = df_sphered.diff().dropna().reset_index(drop=True)  

Центрируем полученные производные:

\tilde{\dot{x}}(t) = \dot{x}(t) - \overline{\dot{x}}.

series_1

series_2

series_3

0

-1.2913

1.7195

0.5629

1

0.3974

-0.9545

-0.4744

2

-0.3520

0.7932

0.6809

3

0.5073

0.2075

-0.1692

4

-1.4046

1.0797

-0.0553

Код на Python
for column in df_derivative.columns:    df_derivative[column] = df_derivative[column] - df_derivative[column].mean()  

Создаем матрицу ковариации B из полученных данных.

B = ⟨\tilde{\dot{x}}(t)\tilde{\dot{x}}^T(t)⟩.

series_1

series_2

series_3

series_1

0.3402

-0.2491

0.0657

series_2

-0.2491

0.4223

0.0531

series_3

0.0657

0.0531

0.4133

Код на Python
B = (df_derivative.T @ df_derivative) / (len(df_derivative) - 1)  

или

B = np.cov(df_derivative.T)  

Вспомним, что мы хотели минимизировать среднюю квадратичную скорость изменения проекции:

Δ_j = ⟨\dot{y}(t)^2⟩⟨\dot{y}(t)^2⟩ = w^{T}⟨\tilde{\dot{x}}(t)\tilde{\dot{x}}^{T}(t)⟩w.

А ⟨\tilde{\dot{x}}(t)\tilde{\dot{x}}^{T}(t)⟩ есть ни что иное как матрица ковариации B. Поэтому:

⟨\dot{y}(t)^2⟩ = w^{T}Bw

Примечание: для общности вывода мы сначала рассмотрим случай без сферизации, где ограничение записывается как w^{T}Aw = 1. Затем подставим A=I как частный случай.

Для поиска решений используем метод множителей Лагранжа.
Чтобы найти значение j-того признака запись формулы будет иметь следующий вид:

L(w, \lambda, \mu_1, ... \mu_{j}) = w_j^{T}Bw_j - \lambda (w_j^{T}Aw_j - 1) - \sum_{i=1}^{j - 1} \mu_i w_j^{T}Aw_i.

Берем производную по w и приравниваем к нулю:

\frac{\partial L}{\partial w_j} = 2Bw_j - 2\lambda Aw_j - \sum_{i=1}^{j - 1} \mu_i Aw_i = 0.

Умножаем слева на w_k^T A^{-1}, где k — индекс любого уже найденного решения (k < j).

У нас получается:

2w_k^{T}B_sw_j - 2\lambda w_k^{T}Aw_j - \sum_{i=1}^{j - 1} \mu_i w_k^{T}Aw_i = 0.

Из-за условия ортогональности w_k^{T}Aw_j=0 и w_k^{T}Bw_j​=0.

- \sum_{i=1}^{j - 1} \mu_i w_k^{T}Aw_i = 0.

Так как при i = k мы получаем w_k^{T}A_sw_k = 1. Из этого у нас следует то, что \mu_i = 0 и мы можем переписать уравнение в следующем виде:

B_sw_j = \lambda_jA_sw_j.

Эта задача сводится к поиску собственных векторов матрицы (A_s^{1/2})^T B A_s^{1/2}.

Так как ранее предварительно сферировали данные, их ковариационная матрица стала единичной — A_s = I, поэтому получается Bw_j = \lambda_jIw_j, что можем записать еще более просто: Bw_j = \lambda_jw_j.

Где \lambda_j это собственное значение, чем оно меньше, тем более медленный признак. Поэтому сортируем по возрастанию значения \lambda_1 < \lambda_2 <…<\lambda_D

В результате вычисления собственных значений матрицы B получаем:

\lambda_1 = 0.1055, \quad \lambda_2 = 0.4364, \quad \lambda_3 = 0.6337

Здесь видно, что первая компонента изменяется практически в 6 раз медленнее третьей.

Признаки получатся из решения y_j(t) = w^{T}_j\tilde{x}, где y_1(t) = w^{T}_1\tilde{x} самый медленный признак, y_2 следующий и так далее. Поэтому решение будет выглядеть как:

y(t) = [y_1, y_2,... y_D]^{T} = W^{T}\tilde{x}(t).

Где W = [w_1, w_2,... w_D] матрица собственных весов.

slow_feature_1

slow_feature_2

slow_feature_3

0

0.1892

-1.7225

-1.4461

1

0.1606

-1.1158

0.6976

2

-0.0173

-1.6569

-0.2794

3

0.0368

-0.9182

0.5490

4

0.5832

-0.9358

0.3825

Код на Python
# Вычисляем собственные значения и собственные векторы марицы Blambdas, w = np.linalg.eigh(B)# Получаем индексы для сортировки собственных значенийsort_idx = np.argsort(lambdas)# Сортируем собственные значенияlambdas_sorted = lambdas[sort_idx]# Сортируем собственные векторыw_sorted = w[:, sort_idx]# Эвристика, которая нужна для воспроизводимости, так как иногда из-за особенностей вычисления график будет отражаться. # Например библиотека sksfa может вернуть перевёрнутый график # по сравнению с ручной реализацией. # Умножение на -1 исправит его положение.for i in range(w_sorted.shape[1]):    if np.sum(w_sorted[:, i]) < 0:        w_sorted[:, i] *= -1# Умножаем матрицу данных на матрицу отсортированных # собственных векторов, получая медленные признаки.y = df_sphered @ w_sorted  

Вот как выглядят графики медленных признаков:

Рассмотрим вклад исходных признаков в полученные решения:

Веса для признака 1

Веса для признака 2

Веса для признака 3

series_1

0.8827

0.3968

-1.7422

series_2

0.4866

0.2028

1.7019

series_3

-0.1265

1.0609

-0.0792

Код на Python
w_original = A_s @ w_sortedweights_df = pd.DataFrame(    w_original,    index=df_sphered.columns,    columns=[f'Веса для медленного признака {i+1}' for i in range(w_original.shape[1])])print("Вклад исходных признаков в каждую медленную компоненту:")display(weights_df)  

Как видно в таблице, модель присваивает в самой медленной компоненте наибольший вес первому, самому стабильному признаку. Мы можем изменять параметры и полученные результаты подтвердят то, что SFA склонен отказываться от наиболее шумных источников.

Реализация алгоритма с помощью библиотеки sksfa

В интернете есть реализации этого алгоритма. Я рассмотрю библиотеку sksfa для описанной задачи:

Код с использованием sksfa

from sksfa import SFAimport numpy as npimport pandas as pdsfa = SFA(n_components=3)y_sksfa = sfa.fit_transform(df.values)w, b = sfa.affine_parameters()# Чтобы график компонент не был перевернутым, # а веса с противоположным знаком, умножу w и y_sksfa на -1w[0, :] *= -1w[2, :] *= -1y_sksfa[:,0] *= -1y_sksfa[:,2] *= -1weights_df = pd.DataFrame(    w.T,    index=df.columns,    columns=[f'Веса для медленного признака {i+1}' for i in range(w.shape[1])])print("Вклад исходных признаков в каждую медленную компоненту (sksfa):")display(weights_df)  

Веса для признака 1

Веса для признака 2

Веса для признака 3

series_1

0.8827

0.3968

-1.7422

series_2

0.4866

0.2028

1.7019

series_3

-0.1265

1.0609

-0.0792

Собственные значения, полученные через sksfa, совпадают с ручным расчётом:

\lambda_1 = 0.1055, \quad \lambda_2 = 0.4364, \quad \lambda_3 = 0.6337.

Сравнение показало, что ручная реализация SFA и библиотека sksfa работают идентично. Собственные значения \lambda_j тоже совпадают, а после приведения знаков весов и компонент к общему виду совпадают веса и сами медленные признаки. Так что вы можете использовать любое удобное решение в своих проектах.

В работе алгоритма как и у многих моделей машинного обучения есть ограничения — он хуже работает, если в данных имеются выбросы или нарушается стацинарность сигнала. Кроме этого, если модель учится на нескольких инвариантах, ее обобщающие качества будут ухудшаться.

Полезные ссылки:

  1. Оригинальная статья: Wiskott, L. and Sejnowski, T. J. (2002). Slow feature analysis: Unsupervised learning of invariances. Neural computation, 14(4):715-770.

  2. sksfa — implementation of Slow Feature Analysis for scikit-learn.

  3. Код в Jupyter Notebook — юпитер ноутбук с кодом из статьи.

ссылка на оригинал статьи https://habr.com/ru/articles/1025754/