Ранжируем треки с помощью TRIBE и RBF

от автора

Ощущение — нравится трек или нет, хочется ли его переслушать возникает во время обработки звука мозгом. Поэтому вместо того, чтобы напрямую предсказывать «качество» музыки по спектрограммам или эмбеддингам, можно построить промежуточное представление: сначала оценить, какие паттерны активности коры вызывает аудио, а затем уже по этим паттернам предсказывать относительную популярность треков. Для предсказания активности коры использовалась нейросеть TRIBE.

TRIBE — это модель brain encoding: она получает стимул и предсказывает, какой отклик он вызовет в коре головного мозга. Изначально TRIBE работает с видео и объединяет три потока признаков — текст, изображение и звук. В этой статье используется только аудио: аудио файл превращается в последовательность векторов, описывающих на предсказанную активность коры.

Практически это означает следующее. На вход подаётся аудио файл, на выходе — матрица:

X^{\mathrm{TRIBE}} \in \mathbb{R}^{T \times D},

где T — число временных фрагментов, а D — число признаков корковой активности. D составляет порядка 20 тысяч, где каждое значение соответствует активности определенного участка коры. Таким образом, один трек превращается в динамику предсказанной реакции мозга по мере звучания музыки.

В качестве исходных данных используется Free Music Archive (далее FMA): это открытый датасет для задач Music Information Retrieval: классификации жанров, рекомендаций, поиска похожей музыки, анализа метаданных. Полная версия FMA содержит больше 100 тысяч треков, но в эксперименте использовался вариант small: 8000 mp3-фрагментов по 30 секунд, 8 сбалансированных жанров. Для этой задачи важен не жанр, а поля из tracks.csv: идентификатор трека, идентификатор альбома и число прослушиваний.

Идея эксперимента такая: если TRIBE действительно сохраняет в своих выходах часть информации о том, как звук обрабатывается мозгом, то в этих признаках может быть слабый сигнал, связанный с тем, какой трек слушатели выбирают чаще. Поставим задачу так: взять два трека из одного альбома и предсказать, какой из них набрал больше прослушиваний. Заметим, что сравнение происходит именно внутри одного альбома — прослушивания плохо сравниваются между разными артистами и релизами: у одного исполнителя 10 тысяч прослушиваний могут быть провалом, у другого — верхней границей аудитории.

Итоговый пайплайн получился такой:

  1. взять подмножество FMA small (не весь датасет, так как TRIBE требовательна к ресурсам);

  2. сгруппировать треки по альбомам;

  3. для каждого трека сохранить число прослушиваний из метаданных;

  4. прогнать аудио через TRIBE и получить матрицу признаков;

  5. сжать выход TRIBE через StandardScaler -> PCA -> StandardScaler;

  6. построить пары сравнений внутри каждого альбома;

  7. обучить небольшую RBF-сеть, которая выдаёт скалярную оценку трека;

  8. проверить accuracy на парах: сколько раз модель поставила более прослушиваемый трек выше.

Результат: около 85% pairwise accuracy на train и около 58% на test. Это слабый, но достоверный сигнал. Ниже — как именно он был получен.

Что именно предсказывается

На входе есть трек. После TRIBE он превращается в последовательность векторов X^{\mathrm{TRIBE}}.

Обучаемая модель считает скаляр f(X) для каждого трека. Для пары треков из одного альбома требуется:

f(X_{\mathrm{popular}}) > f(X_{\mathrm{less\ popular}})

Это важное упрощение. Оно делает задачу устойчивее к разным масштабам популярности между альбомами, жанрами и артистами.

Подготовка FMA

В FMA метаданные лежат в tracks.csv с многоуровневым заголовком. Для эксперимента нужны три поля:

  • track_id — чтобы найти mp3-файл;

  • album_id — чтобы группировать сравнения;

  • track/listens — число прослушиваний, используемое как рейтинг.

Фрагмент подготовки метаданных:

from pathlib import Pathimport pandas as pdMETA_DIR = Path("data/fma_metadata")TRACKS_CSV = META_DIR / "tracks.csv"tracks = pd.read_csv(      TRACKS_CSV,    index_col=0,     header=[0, 1],     engine="python",      on_bad_lines="skip",)tracks = tracks[tracks[("set", "subset")] == "small"].copy()tracks["track_id"] = tracks.index.astype(int)tracks["album_id"] = tracks[("album", "id")]tracks["popularity"] = tracks[("track", "listens")]tracks = tracks[["track_id", "album_id", "popularity"]].dropna()

Альбомы с 1-2 треками бесполезны: внутри них нечего сравнивать. Поэтому я оставлял только альбомы, где есть минимум 3 трека:

MIN_TRACKS_IN_ALBUM = 3album_sizes = tracks.groupby("album_id").size()valid_album_ids = album_sizes[album_sizes >= MIN_TRACKS_IN_ALBUM].indextracks = tracks[tracks["album_id"].isin(valid_album_ids)].copy()

Разбиение train/test тоже делается по альбомам.

import randomSEED = 42TEST_FRAC = 0.2rng = random.Random(SEED)album_ids = list(tracks["album_id"].unique())rng.shuffle(album_ids)n_test = int(len(album_ids) * TEST_FRAC)test_albums = set(album_ids[:n_test])train_albums = set(album_ids[n_test:])tracks["split"] = tracks["album_id"].apply(    lambda album_id: "test" if album_id in test_albums else "train")

Файлы удобно разложить так:

data/fma_dataset/  train/            album_123/             000001.mp3                000002.mp3               ratings.json     test/           album_456/           000101.mp3               000102.mp3               ratings.json

ratings.json хранит только локальную таблицу популярности внутри папки альбома:

{      "000001": 1534,     "000002": 847,      "000003": 2681}

На этапе построения пар каждая папка альбома становится самостоятельным объектом: список матриц треков плюс список пар (better_idx, worse_idx).

Прогон аудио через TRIBE

TRIBE используется как фиксированный экстрактор признаков. Код ниже показывает сам инференс без деталей.

from pathlib import Pathimport numpy as npfrom tribev2.demo_utils import TribeModelDATASET_PATH = Path("data/fma_dataset")CACHE_DIR = Path("cache/tribe")model = TribeModel.from_pretrained(             "facebook/tribev2",              cache_folder=str(CACHE_DIR))for audio_path in sorted(DATASET_PATH.rglob("*.mp3")):   events = model.get_events_dataframe(audio_path=str(audio_path))    events = events[events["type"] == "Audio"]     preds, segments = model.predict(events=events)     preds = np.asarray(preds, dtype=np.float32)     np.save(audio_path.with_suffix(".npy"), preds)

После этого рядом с каждым mp3 появляется npy:

000001.mp3000001.npyratings.json

T зависит от длительности трека и от того, как TRIBE режет аудио на события. D велико, поэтому сразу обучать на исходном выходе неудобно из-за большой размерности.

Сжатие выхода TRIBE

Соседние или функционально близкие зоны мозга в таком представлении могут давать коррелированные значения. Поэтому перед RBF-моделью использовалось сжатие:

\mathrm{StandardScaler}_1 \rightarrow \mathrm{PCA} \rightarrow \mathrm{StandardScaler}_2

PCA оставляет 16 компонент (explained variance ~ 95%):

from pathlib import Pathimport joblibimport numpy as npfrom sklearn.pipeline import Pipelinefrom sklearn.decomposition import PCAfrom sklearn.preprocessing import StandardScalerDATASET_PATH = Path("data/fma_dataset")PREPROCESSOR_PATH = Path("models/preprocessor.pkl")N_COMPONENTS = 16npy_files = sorted((DATASET_PATH / "train").rglob("*.npy"))blocks = []for path in npy_files:      x = np.load(path)      blocks.append(x.astype(np.float32))X_train_frames = np.concatenate(blocks, axis=0)preprocessor = Pipeline([                             ("scaler_1", StandardScaler()),      ("pca", PCA( n_components=N_COMPONENTS, svd_solver="randomized", random_state=42, )),      ("scaler_2", StandardScaler()),])X_debug = preprocessor.fit_transform(X_train_frames)pca = preprocessor.named_steps["pca"]PREPROCESSOR_PATH.parent.mkdir(parents=True, exist_ok=True)joblib.dump(preprocessor, PREPROCESSOR_PATH)

После этого каждый трек описывается матрицей:

X \in \mathbb{R}^{T \times 16}

Формирование пар для сравнения

Число прослушиваний зависит от релиза, аудитории, попадания в подборки, времени публикации, названия артиста, обложки и ещё десятков факторов. Если обучать регрессию audio -> listens, модель будет пытаться объяснить аудиосигналом то, чего в аудио нет.

Поэтому используем pairwise ranking. Для каждого альбома берутся пары треков. Если один трек имеет больше прослушиваний, он считается better, второй — worse. Но сравнивать слишком близкие значения тоже плохо: разница между 1000 и 1020 прослушиваниями может быть шумом. Поэтому рейтинг переводится в логарифм, а пары с маленькой разницей отбрасываются.

Для альбома с рейтингами r_i:

s_i = \log_{10}(r_i + \varepsilon)

Пара используется только если:

|s_i - s_j| \geq \sigma_{\mathrm{album}},

где \sigma_{\mathrm{album}} — стандартное отклонение логарифмических рейтингов внутри альбома.

Код построения пар:

from itertools import combinationsimport numpy as npMIN_SIGMA_TO_COMPARE = 1.0EPS = 1e-8def build_album_pairs(ratings):      scores = np.asarray(ratings, dtype=np.float32)      log_scores = np.log10(scores + EPS)      sigma = np.std(log_scores)      threshold = MIN_SIGMA_TO_COMPARE * sigma      pairs = []      for i, j in combinations(range(len(log_scores)), 2):            diff = abs(log_scores[i] - log_scores[j])            if diff < threshold:                  continue            if log_scores[i] > log_scores[j]:      pairs.append((i, j))            else:                  pairs.append((j, i))      return pairs

Здесь (i, j) означает: трек i должен получить оценку выше, чем трек j.

Сборка датасета для обучения

На этом этапе каждая папка альбома превращается в структуру:

dataset = [               {                    "tracks": [track_matrix_0, track_matrix_1, ...],                   "pairs": [(better_idx, worse_idx), ...],               },              ...,]

Код:

from pathlib import Pathimport jsonimport joblibimport numpy as npDATASET_PATH = Path("data/fma_dataset")PREPROCESSOR_PATH = Path("models/preprocessor.pkl")preprocessor = joblib.load(PREPROCESSOR_PATH)def load_split(split_name):      split_dir = DATASET_PATH / split_name      albums = []      for album_dir in sorted(p for p in split_dir.iterdir() if p.is_dir()):    with open(album_dir / "ratings.json", "r", encoding="utf-8") as f:      ratings_dict = json.load(f)      tracks = []      ratings = []      for npy_path in sorted(album_dir.glob("*.npy")):        track_name = npy_path.stem        if track_name not in ratings_dict:          continue                   x = np.load(npy_path)         x = preprocessor.transform(x).astype(np.float32)                tracks.append(x)        ratings.append(float(ratings_dict[track_name]))               if len(tracks) < 2:        continue                   pairs = build_album_pairs(ratings)             if not pairs:         continue             albums.append({                      "tracks": tracks,                       "pairs": pairs,              })    return albumstrain_dataset = load_split("train")test_dataset = load_split("test")

Важный момент: pairs содержат индексы внутри альбома. Это не глобальные индексы файлов. Поэтому при обучении пара хранится как (album_idx, better_idx, worse_idx):

def flatten_pairs(dataset):      result = []      for album_idx, album in enumerate(dataset):            for better_idx, worse_idx in album["pairs"]:                  result.append((album_idx, better_idx, worse_idx))      return resulttrain_pairs = flatten_pairs(train_dataset)test_pairs = flatten_pairs(test_dataset)

Модель: RBF поверх временной последовательности

После препроцессора трек — это последовательность векторов:

\mathbf{b}_j \in \mathbb{R}^{16}, \qquad j = 1, \ldots, N_{\mathrm{bases}}

Для предсказания популярности используем RBF-сеть. RBF-сеть (Radial Basis Function network) — это модель, которая сравнивает входные данные с набором «эталонных» паттернов. Каждый RBF-узел отвечает за некоторую область в пространстве признаков: если вход похож на соответствующий паттерн, отклик узла большой; если далёк — отклик б ыстро падает.
В эксперименте RBF-модель содержит N_{\mathrm{bases}} = 8 центров:

\mathbf{b}_j \in \mathbb{R}^{16}, \qquad j = 1, \ldots, N_{\mathrm{bases}}

Для каждого кадра \mathbf{x}_t считается взвешенное расстояние до каждого центра.

d_{tj}^{2} = \sum_{k=1}^{16} w_k (x_{tk} - b_{jk})^2

w_k > 0 — обучаемая важность k-й компоненты. Чтобы вес всегда был положительным, в коде хранится сырой параметр w_raw, а реальный вес считается через softplus:

w_k = \operatorname{softplus}(w_{\mathrm{raw}, k})

Отклик:

K_{tj} = \exp(-\gamma_j d_{tj}^{2}),

где \gamma_j > 0 — обучаемая ширина j-го базиса. Большой \gamma_j делает базис узким: он реагирует только на близкие к центру паттерны.

Скалярная оценка кадра:

z_t = \sum_{j=1}^{N_{\mathrm{bases}}} v_j K_{tj}

Скалярная оценка трека — среднее по времени:

f(X) = \frac{1}{T} \sum_{t=1}^{T} z_t

Полная модель на PyTorch:

import torchimport torch.nn as nndef inv_softplus(x):      x = torch.as_tensor(x, dtype=torch.float32)      return torch.log(torch.expm1(x))class RBFSequenceModel(nn.Module):      def __init__(self, n_bases, dim, gamma_init=0.01, w_init=1.0):  super().__init__()         self.n_bases = n_bases      self.dim = dim        self.b = nn.Parameter(torch.randn(n_bases, dim))      self.gamma_raw = nn.Parameter(inv_softplus(gamma_init).repeat(n_bases))      self.v = nn.Parameter(torch.randn(n_bases) * 0.01)         self.w_raw = nn.Parameter(inv_softplus(w_init).repeat(dim))       self.softplus = nn.Softplus()       def w(self):          return self.softplus(self.w_raw)      def gamma(self):           return self.softplus(self.gamma_raw)     def forward(self, x):          diff = x[:, None, :] - self.b[None, :, :]        dist2 = (diff ** 2 * self.w()[None, None, :]).sum(dim=2)        k = torch.exp(-self.gamma()[None, :] * dist2)        z = (k * self.v[None, :]).sum(dim=1)          return z.mean()

Лосс: softplus от отрицательного margin

Для пары (better, worse) модель считает:

y_{\mathrm{better}} = f(X_{\mathrm{better}}), \qquad y_{\mathrm{worse}} = f(X_{\mathrm{worse}})

Margin:

m = y_{\mathrm{better}} - y_{\mathrm{worse}}

Если m > 0, порядок правильный. Если m < 0, модель ошиблась. Лосс:

\mathcal{L} = \log(1 + \exp(-m))

В PyTorch это F.softplus(-margin):

import torch.nn.functional as Fdef pairwise_batch_loss(model, dataset, batch):     losses = []       for album_idx, better_idx, worse_idx in batch:         album = dataset[album_idx]            x_better = album["tracks"][better_idx]       x_worse = album["tracks"][worse_idx]          y_better = model(x_better)           y_worse = model(x_worse)          margin = y_better - y_worse          losses.append(F.softplus(-margin))    return torch.stack(losses).mean()

Метрика

Метрика выбираем, исходя из постановки задачи: доля правильно упорядоченных пар.

\operatorname{accuracy} = \frac{\#\{(i, j): f(X_i) > f(X_j)\}} {\#\{(i, j)\}}

import randomimport torch@torch.no_grad()def evaluate_accuracy(model, dataset, pairs, max_pairs=None):   model.eval()     if max_pairs is not None and len(pairs) > max_pairs:          eval_pairs = random.sample(pairs, max_pairs)     else:            eval_pairs = pairs     correct = 0     for album_idx, better_idx, worse_idx in eval_pairs:        album = dataset[album_idx]         y_better = model(album["tracks"][better_idx])          y_worse = model(album["tracks"][worse_idx])            correct += int(y_better.item() > y_worse.item())       return correct / len(eval_pairs)

У этой метрики есть понятная случайная база: 50%. Если модель ничего не выучила, она будет примерно угадывать знак пары.

Обучение

Параметры эксперимента:

N_BASES = 8N_EPOCHS = 250BATCH_SIZE = 16LR = 5e-3

Минимальный цикл обучения:

import randomimport numpy as npimport torchdef batchify(items, batch_size, shuffle=True):      indices = list(range(len(items)))     if shuffle:            random.shuffle(indices)     for start in range(0, len(indices), batch_size):        batch_indices = indices[start:start + batch_size]          yield [items[i] for i in batch_indices]    def convert_dataset_to_torch(dataset, device):    converted = []      for album in dataset:      tracks = [                  torch.tensor(x, dtype=torch.float32, device=device)          for x in album["tracks"]            ]           converted.append({                       "tracks": tracks,                        "pairs": album["pairs"],    })     return convertedrandom.seed(SEED)np.random.seed(SEED)torch.manual_seed(SEED)DEVICE = "cuda" if torch.cuda.is_available() else "cpu"train_dataset_torch = convert_dataset_to_torch(train_dataset, DEVICE)test_dataset_torch = convert_dataset_to_torch(test_dataset, DEVICE)train_pairs = flatten_pairs(train_dataset_torch)test_pairs = flatten_pairs(test_dataset_torch)DIM = train_dataset_torch[0]["tracks"][0].shape[1]model = RBFSequenceModel(n_bases=N_BASES, dim=DIM).to(DEVICE)optimizer = torch.optim.AdamW(    model.parameters(),    lr=LR)train_losses = []train_accs = []test_accs = []for epoch in range(1, N_EPOCHS + 1):     model.train()     epoch_losses = []    for batch in batchify(train_pairs, BATCH_SIZE, shuffle=True):        optimizer.zero_grad()           loss = pairwise_batch_loss(model, train_dataset_torch, batch)       loss.backward()             optimizer.step()            epoch_losses.append(loss.item())        train_loss = float(np.mean(epoch_losses))     train_acc = evaluate_accuracy(model, train_dataset_torch, train_pairs)    test_acc = evaluate_accuracy(model, test_dataset_torch, test_pairs)      train_losses.append(train_loss)     train_accs.append(train_acc)      test_accs.append(test_acc)        print(        f"epoch={epoch:03d} "                 f"loss={train_loss:.6f} "                f"train_acc={train_acc * 100:.2f}% "                f"test_acc={test_acc * 100:.2f}%"        )

Результаты

На текущем запуске модель дала примерно такие значения:

train accuracy: около 85%test accuracy:  около 58%
Рис. 1 Динамика accuracy

Рис. 1 Динамика accuracy

Это похоже на переобучение, и оно ожидаемо: данных мало, выход TRIBE сжат до 16 компонент, RBF-центров всего 8, но число факторов популярности намного больше, чем акустический сигнал.

При этом 58% на test всё равно выше случайных 50%. Я бы не называл это сильным результатом. Корректнее сказать так: в TRIBE-представлении есть слабый сигнал, связанный с популярностью треков .

Интерпретация параметров модели

Так как PCA удаляет признаки с высокой корреляцией (т. е. близкие участки коры), можно предположить, что каждая компонента вектора x описывает активность какого-то определенного участка мозга, а соответствующая компонента вектора w — важность этого участка для конечного предсказания. Очевидно, не вся кора отвечает за реакцию на музыку, что видно из графика ниже.

Рис. 2 Распределение компонент w

Рис. 2 Распределение компонент w

Каждый вектор b описывает некоторое фиксированное распределение активности коры, а скаляр v — то, насколько «ок/не ок» это состояние ощущается.

Вывод

Эксперимент не доказывает, что популярность музыки можно предсказывать по «мозговым» признакам. Он показывает более узкую вещь: если взять TRIBE как фиксированный экстрактор, сжать его выход до 16 компонент и обучить RBF‑модель на попарном ранжировании треков внутри альбомов, получается test accuracy около 58% против случайных 50%.

Для практической рекомендательной системы этого мало. Для технического baseline — уже достаточно интересно: пайплайн воспроизводим, модель маленькая, лосс соответствует задаче, а результат можно сравнивать с более простыми и более сильными аудиопризнаками.

Главный вывод для меня: такую задачу нельзя честно формулировать как «предсказание популярности музыки». Но её можно формулировать как проверку слабого ранжирующего сигнала в нейрофизиологически мотивированном представлении аудио. И в этой формулировке результат не выглядит случайным, хотя до убедительной модели ещё далеко.

Примечание: существует концептуально аналогичная работа: Virality Predictor. Статья была написана за день до выхода этого продукта, и её можно рассматривать, как попытку сделать локально‑запускаемый self‑made аналог.

ссылка на оригинал статьи https://habr.com/ru/articles/1041184/