Продвинутое использование библиотеки PYTORCH: от подготовки данных до визуализации

от автора

PyTorch — современная библиотека машинного обучения с открытым исходным кодом, разработанная компанией Facebook. Как и другие популярные библиотеки, такие как TensorFlow и Keras, PyTorch позволяет использовать вычислительную мощность видеокарт, автоматически составлять граф вычислений, дифференцировать и считать его. Но, в отличие от предыдущих библиотек, обладает более гибким функционалом, благодаря тому, что использует динамический граф вычислений.

Сейчас мы пройдем все этапы работы с библиотекой PyTorch. Мы затронем далеко не все возможности данной библиотеки, но их хватит, чтобы начать с ней работать. Научимся пользоваться инструментами для подготовки данных, которые делают загрузку данных легкой и уменьшают объем написанного кода. Создадим простую нейросеть, а также класс, который будет ее обучать и который можно будет применить для обучения любой модели, созданной в PyTorch. В конце мы визуализируем результат, чтобы оценить качество обученной модели.

Для начала загрузим нужные библиотеки:

import torch from torch.utils.data import Dataset, DataLoader import matplotlib.pyplot as plt from sklearn.datasets import make_moons from sklearn.model_selection import train_test_split import copy import datetime as dt import pandas as pd import numpy as np

Воспользуемся генератором датасета make_moons из библиотеки sklearn, который генерирует два класса в двумерном пространстве в виде полумесяца. Добавим немного шума при генерации (переменная noise)

X, y = make_moons(n_samples=150, random_state=33, noise=0.2)

Обернем данные в pandas.DataFrame для наглядности:

df = pd.DataFrame(X, columns=['x1', 'x2']) df['target'] = y df.head(5)

Раделим датасет на тренировочную и тестовую выборки с соотношением 6:4 при помощи функции train_test_split из библиотеки sklearn

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=33)

Визуализируем получившееся разделение:

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12,5))  ax1.scatter(X_train[:,0][y_train == 0], X_train[:,1][y_train == 0], c='red', marker='o', edgecolors = 'black', alpha = 0.6) ax1.scatter(X_train[:,0][y_train == 1], X_train[:,1][y_train == 1], c='green', marker = 'o', edgecolors = 'black', alpha = 0.6) ax1.set_title('Train', fontsize=20)  ax2.scatter(X_test[:,0][y_test == 0], X_test[:,1][y_test == 0], c='red', marker='o', edgecolors = 'black', alpha = 0.6) ax2.scatter(X_test[:,0][y_test == 1], X_test[:,1][y_test == 1], c='green', marker = 'o', edgecolors = 'black', alpha = 0.6) ax2.set_title('Test', fontsize=20);

Создадим класс MyDataset, который наследуется от класса torch.utils.data.Dataset. Переопределим методы __len и __getitem, чтобы при вызове соответствующих методов мы получали объем выборки и пару значение-метка.

class MyDataset(Dataset):        def __init__(self, X, y):         self.X = torch.Tensor(X)         self.y = torch.from_numpy(y).float()      def __len__(self):         return self.X.shape[0]        def __getitem__(self, index):         return (self.X[index], self.y[index])

Сделаем нашу игрушечную нейросеть с тремя полно связными слоями. Между слоями поместим функции активации ReLU, а в третьем слое — сигмоиду.Первый слой будет принимать на вход значение x1 и x2 и на выходе давать 50 значений. Второй будет принимать 50 значений из первого слоя и подавать 50 значений на третий слой. А уже третий слой будет сжимать эти 50 значений до одного в интервале от 0 до 1 (это будет вероятность отнесения наблюдения к классу «1»).

import torch.nn as nn  ReLU = nn.ReLU()  class Net(nn.Module):     def __init__(self):         super(Net, self).__init__()         self.fc1 = nn.Linear(2, 50)         self.fc2 = nn.Linear(50, 50)         self.fc3 = nn.Linear(50, 1)      def forward(self, x):         x = ReLU(self.fc1(x))         x = ReLU(self.fc2(x))         x = torch.sigmoid(self.fc3(x))         return x.view(-1)

Создадим экземпляр нашей нейросети и выведем ее структуру. Мы увидим количество слоев, количество параметров на входе и выходе у каждого слоя и наличие\отсутствие свободного члена.

net = Net() print(net) Net(   (fc1): Linear(in_features=2, out_features=50, bias=True)   (fc2): Linear(in_features=50, out_features=50, bias=True)   (fc3): Linear(in_features=50, out_features=1, bias=True) )

Теперь создадим класс, который будет похож на модели, реализованные в библиотеке sklearn. Он будет предобрабатывать данные, тренировать нашу нейросеть, запоминать ход обучения и сохранять самую успешную попытку. Описание всех параметров находится внутри класса.

In [9]: class Trainer():     """     Parameters:         dataset: пользовательский класс, предобрабатывающий данные         loss_f: функция потерь         learning_rate: величина градиентного шага         epoch_amount: общее количество эпох         batch_size: размер одного бача         max_batches_per_epoch: максимальное количество бачей,                                 подаваемых в модель в одну эпоху         device: устройство для вычислений         early_stopping: количество эпох без улучшений до остановки обучения         optim: оптимизатор         scheduler: регулятор градиентного шага         permutate: перемешивание тренировочной выборки перед обучением      Attributes:         start_model: необученная модель         best_model: модель, после обучения         train_loss: средние значения функции потерь на тренировочных                      данных в каждой эпохе         val_loss: средние значения функции потерь на валидационных                    данных в каждой эпохе      Methods:         fit: обучение модели         predict: возвращает предсказание обученной моделью      """     def __init__(self,  dataset, net, loss_f, learning_rate=1e-3,                  epoch_amount=10, batch_size=12,                  max_batches_per_epoch=None,                 device='cpu', early_stopping=10,                  optim=torch.optim.Adam,                  scheduler=None, permutate=True):                  self.loss_f = loss_f         self.learning_rate = learning_rate         self.epoch_amount = epoch_amount         self.batch_size = batch_size         self.max_batches_per_epoch = max_batches_per_epoch         self.device = device         self.early_stopping = early_stopping         self.optim = optim         self.scheduler = scheduler         self.permutate = permutate         self.dataset = dataset         self.start_model = net         self.best_model = net          self.train_loss = []         self.val_loss = []      def predict(self, X):         return self.best_model(X)      def fit(self, X_train, X_test, y_train, y_test):          Net = self.start_model                      device = torch.device(self.device)          Net.to(self.device)          optimizer = self.optim(Net.parameters(), lr=self.learning_rate)                  if self.scheduler is not None:             scheduler = self.scheduler(optimizer)          train = self.dataset(X_train, y_train)         val = self.dataset(X_test, y_test)            train = DataLoader(train, batch_size=self.batch_size, shuffle=self.permutate)          val = DataLoader(val, batch_size=self.batch_size, shuffle=False)          best_val_loss = float('inf') # Лучшее значение функции потерь на валидационной выборке                                      # функции потерь на валидационной выборке         best_ep = 0                  # Эпоха, на которой достигалось лучшее                                       # значение функции потерь на валидационной выборке          for epoch in range(self.epoch_amount):              start = dt.datetime.now()             print(f'Эпоха: {epoch}', end=' ')             Net.train()             mean_loss = 0             batch_n = 0              for batch_X, target in train:                 if self.max_batches_per_epoch is not None:                     if batch_n >= self.max_batches_per_epoch:                         break                 optimizer.zero_grad()                  batch_X = batch_X.to(self.device)                 target = target.to(self.device)                  predicted_values = Net(batch_X)                 loss = self.loss_f(predicted_values, target)                 loss.backward()                 optimizer.step()                  mean_loss += float(loss)                 batch_n += 1                      mean_loss /= batch_n             self.train_loss.append(mean_loss)             print(f'Loss_train: {mean_loss}, {dt.datetime.now() - start} сек')              Net.eval()             mean_loss = 0             batch_n = 0              with torch.no_grad():                 for batch_X, target in val:                     if self.max_batches_per_epoch is not None:                         if batch_n >= self.max_batches_per_epoch:                             break                 batch_X = batch_X.to(self.device)                 target = target.to(self.device)                  predicted_values = Net(batch_X)                 loss = self.loss_f(predicted_values, target)                  mean_loss += float(loss)                 batch_n += 1                      mean_loss /= batch_n             self.val_loss.append(mean_loss)             print(f'Loss_val: {mean_loss}')              if mean_loss < best_val_loss:                 self.best_model = Net                 best_val_loss = mean_loss                 best_ep = epoch             elif epoch - best_ep > self.early_stopping:                 print(f'{self.early_stopping} без улучшений. Прекращаем обучение...')                 break             if self.scheduler is not None:                 scheduler.step()             print()

Прежде всего мы запишем все параметры обучения нашей модели в атрибуты класса внутри метода __init__. Также инициализируем два атрибута train_loss и val_loss, куда будем записывать значение функции потерь на тренировочной и валидационной выборке.

Затем создаем метод predict для удобного использования обученной модели. После создаем метод fit, который будет производить обучение нашей модели. На вход он будет принимать данные для тренировки и валидации. Внутри него помещаем нашу модель на то устройство, на котором собираемся производить вычисления (‘cpu’ — для процессора или ‘cuda:*’, где * номер вашей видеокарты. Это самые распространенные устройства, могут быть и другие).

Далее указываем оптимизатор torch.optim, который будет делать градиентные шаги. Если пользователь ничего не выбрал, будет использоваться torch.optim.Adam. Также пользователь может настроить изменение градиентных шагов в ходе обучения с помощью torch.optim.lr_scheduler, но это не обязательно.

После этого помещаем данные для тренировочной и валидационной выборки в класс, который создали ранее (в нашем случае это MyDataset).

Затем подаем наши подготовленные данные в torch.utils.data.DataLoader, указывая размер бача (сколько данных будет подаваться в модель за один раз). Классы torch.utils.data.DataLoader и torch.Tensor torch.utils.data.Dataset (от которого мы унаследовали класс MyDataset) служат для упрощения и ускорения загрузки данных и экономии памяти.

Подготовив данные и создав необходимые переменные, мы начинаем процесс обучения. Для этого переводим нашу модель в режим train. В этом состоянии будут работать различные методы регуляризации (батч-нормализация, дропаут и т.д). В нашем примере для простоты мы никаких методов регуляризации в нашу модель не добавили.

Внутри второго цикла мы перебираем батчи, которые нам отдает итератор train_dataloader. Каждый батч перемещаем на то устройство, которое мы выбрали в начале. Далее обнуляем градиент у оптимизатора, т.к. в PyTorch по умолчанию градиент будет накапливаться после каждой итерации. После этого делаем основные три действия: совершаем прямой проход по нейросети, считаем значение для функции потерь и на основе этого делаем градиентный шаг. Выводим значение функции потерь на тренировочной выборке и время, потраченное на обучение внутри одной эпохи.

Затем мы считаем значении функции потерь на валидационной выборке. Для этого мы переводим модель в режим eval и отключаем расчет градиента. После этого делаем то же, что и при обучении, но без градиентного шага. Результат функции потерь также выводим на экран. Если результат функции потерь на валидационной выборке оказался лучше, чем был, то мы сохраняем текущую модель в переменную best_model.
Все эти шаги повторяются столько раз, сколько мы указали в epoch_amount, либо до тех пор пока значение функции потерь на валидационной выборке не перестанет изменяться (если указан параметр early_stopping).

На выходе мы получим модель, которая лучшим образом показала себя на валидационной выборке, значение функции потерь на этой модели и два списка со значением функции потерь в ходе обучения на тренировочных и валидационных данных.

Теперь настало время опробовать класс на нашем датасете. В качестве функции потерь выберем бинарную кросс-энтропию torch.nn.BCELoss, а в качестве оптимизатора — стохастический градиентный спуск torch.optim.SGD.

Запишем все выбранные параметры в словарь и подадим его методу fit.

params = {     'dataset': MyDataset,     'net': net,     'epoch_amount': 1000,      'learning_rate': 1e-2,     'early_stopping': 25,     'loss_f': nn.BCELoss(),     'optim': torch.optim.SGD, }  clf = Trainer(**params) clf.fit(X_train, X_test, y_train, y_test)

Осталось визуализировать полученный результат.

Построим график изменения значения функции потерь на тренировочной и валидационной выборке от количества эпох.

def plot_loss(Loss_train, Loss_val):     plt.figure(figsize=(12, 5))     plt.plot(range(len(Loss_train)), Loss_train, color='orange', label='train', linestyle='--')     plt.plot(range(len(Loss_val)), Loss_val, color='blue', marker='o', label='val')     plt.legend()     plt.show()  plot_loss(clf.train_loss, clf.val_loss)

Визуализируем границу разделения.

def make_meshgrid(x1, x2, h=.02):     x1_min, x1_max = x1.min() - 2, x1.max() + 2     x2_min, x2_max = x2.min() - 2, x2.max() + 2     xx1, xx2 = np.meshgrid(np.arange(x1_min, x1_max, h), np.arange(x2_min, x2_max, h))     return xx1, xx2  def plot_contours(ax, xx1, xx2, **params):     C = clf.predict(torch.Tensor(np.c_[xx1.ravel(), xx2.ravel()])).detach().numpy()     C = C.reshape(xx1.shape)     out = ax.contourf(xx1, xx2, C, **params)     return out  fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12,5))  xx1, xx2 = make_meshgrid(X[0], X[1])  ax1.scatter(X_train[:,0][y_train == 0], X_train[:,1][y_train == 0], c='red', marker='o', edgecolors = 'black', alpha = 0.6) ax1.scatter(X_train[:,0][y_train == 1], X_train[:,1][y_train == 1], c='green', marker = 'o', edgecolors = 'black', alpha = 0.6) ax1.set_title('Train', fontsize=20) plot_contours(ax1, xx1, xx2, alpha=0.2)  ax2.scatter(X_test[:,0][y_test == 0], X_test[:,1][y_test == 0], c='red', marker='o', edgecolors = 'black', alpha = 0.6) ax2.scatter(X_test[:,0][y_test == 1], X_test[:,1][y_test == 1], c='green', marker = 'o', edgecolors = 'black', alpha = 0.6) ax2.set_title('Test', fontsize=20) plot_contours(ax2, xx1, xx2, alpha=0.2);

Желтым цветом показана область, где вероятность встретить точку из класса «1» максимальна. Соответственно, для точки из класса «0» вероятность максимальна в красной области. Как мы видим, граница разделения получилась нелинейной, т.к. в нейросетях используются нелинейные функции активации.

ссылка на оригинал статьи https://habr.com/ru/post/553716/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *