PyTorch — это мощный и гибкий фреймворк для машинного обучения, широко используемый для создания нейронных сетей. Он особенно популярен благодаря простоте использования, динамическим вычислительным графам и богатой экосистеме инструментов для обучения моделей. Для использования этого фреймворка, часто достаточно поверхностно понимать работу алгоритмов машинного обучения.
Но Андрей Карпаты, известный исследователь в области ИИ, считает, что реализация алгоритмов с нуля позволяет понять их суть и детали работы, что сложно осознать, используя только готовые библиотеки. Это помогает развить интуицию для дальнейшего применения и улучшения методов. Андрей посвящает много собственного времени, чтобы объяснять ключевые принципы работы нейросетей в своих блогах и на своём ютуб-канале. Он также не раз подчеркивал, что на его курсе в Cтэнфорде есть задачи по реализации различных алгоритмов, например, обратное распространение.
Я хотел бы посвятить данную статью этой идеи, потому что мне самому особенно интересно копаться в алгоритмах глубокого обучения. Эта статья продолжение первой статьи
Сегодня мы:
-
добавим CNN, BatchNorm, MaxPool, MinPool
-
реализуем RMSProp, NaG, Adam
-
добавим регуляризацию в loss-функцию
-
добавим новые функции активации
-
напишем DataLoader
Поехали!
Мы остановились на том, что сделали достаточно похожую на PyTorch обертку.
loss_fn = CrossEntropyLoss() model = SimpleNet() optim = SGD(model.parameters(), learning_rate = 0.01) for i in range(100): output = model(input_x) loss = loss_fn(output, target_x) loss.backward() optim.step()
Давайте теперь попытаемся обучить на каких-нибудь реальных данных! Для этой задачи нам подойдет набор рукописных цифр MNIST. Но как нам эффективно обработать весь датасет? Вспомним как это делается в PyTorch. Мы создаем объект класса DataLoader — который позволяет разбить весь датасет на батчи и работает как итерируемый объект. Давайте продолжим создавать собственные реализации.
Давайте хранить ссылку на наш набор в переменной dataset. Например
import pandas as pd import numpy as np data_pd = pd.read_csv(path) dataset = np.array(data_pd).astype(float) np.random.shuffle(dataset) # перемешаем датасет dataset.shape >>> (5000, 785) # 1 значение класса + 784 значения пискелей
Dataloader
class DataLoader(): def __init__(self, data, batch_size=64, shuffle=True, flatten = True): self.data = data self.index = 0 # индекс нужен для корректной итерации self.items = [] # здесь будем хранить наборы батчей self.flatten = flatten # определяем сколько раз сможем проитерировать весь набор self.max = data.shape[0] // batch_size + (1 if data.shape[0] % batch_size != 0 else 0) if shuffle == True: # мешаем набор если захотел пользователь self.data = np.random.permutation(self.data) # создаем список всех батчей for _ in range(self.max): self.items.append(self.data[batch_size * _: batch_size * (_ + 1)]) # превращаем в итерируемый объект def __iter__(self): return self # для циклов for и while def __next__(self): if self.index < self.max: value = self.items[self.index] # получаем батч self.index += 1 if self.flatten: # возвращаем в нашем случае либо ((64, 784), (64, 1)) return value[:, 1:], value[:, 0].reshape(-1, 1) else: # либо ((64, 28, 28), (64, 1)), то есть как изображение return value[:, 1:].reshape(value.shape[0], 28, 28), value[:, 0].reshape(-1, 1) else: self.index = 0 raise StopIteration # выходим из цикла
Теперь вся загрузка набора данных по батчам выглядит так:
data_loader = DataLoader(dataset, batch_size=64, shuffle=True, flatten=True)
Давайте посмотрим на наш батч.
for x, target in dataloader: break target.shape >>> (64, 1)
Странно, у нас вроде 10 классов, а размер почему-то 1. Заглянем внутрь
target[0] >>> array([8.])
Это номер класса, а мы ведь ждали, что будет [0, 0, 0, 0, 0, 0, 0, 0, 1, 0]
Тогда давайте обработаем такой случай внутри нашего класса CrossEntropyLoss
class CrossEntropyLoss: def __call__(self, logits, true): predicted = np.exp(logits) / np.sum(np.exp(logits), axis=1).reshape(-1, 1) # softmax self.predicted = np.array(predicted, copy=True) # сделаем копию входных матрицы для дальнейших вычислений ### Добавили эти строчки number_of_classes = predicted.shape[1] self.true = np.int_(np.arange(0, number_of_classes) == true) ### # вычисляем значение лосс-функции прямо по формуле self.loss = -1 * np.sum(self.true * np.log(self.predicted + 1e-5), axis=1) return self
Теперь попробуем обучить нашу нейронку. Создадим модель
class SimpleNet(Module): def __init__(self): super().__init__() self.linear1 = Linear(input_channels=784, output_channels=200, bias=True) self.linear2 = Linear(input_channels=200, output_channels=50, bias=True) self.linear3 = Linear(input_channels=50, output_channels=10, bias=True) self.relu = ReLU() def forward(self, x): x = self.linear1(x) x = self.relu(x) x = self.linear2(x) x = self.relu(x) x = self.linear3(x) return x
Инициализируем всё необходимое
loss_fn = CrossEntropyLoss() model = SimpleNet() optim = SGD(model.parameters(), learning_rate = 0.01)
Для нашего набора в 5000 экземпляров получится 5000 // 64 = 78 батчей. Пройдемся, например, 5 раз по нашему датасету, то есть у нас будет 5 эпох.
for i in range(5): for batch in data_loader: input_x, target = batch input_x = input_x / 255 # нормируем на [0, 1], иначе обучение может стать нестабильным output = model(input_x) loss = loss_fn(output, target) loss.backward() optim.step() print(loss.loss.mean(), i) # Берем среднее значение лосса по батчу >>>0.622 0 0.294 1 0.166 2 0.101 3 0.070 4
Значение loss-функции уменьшается, значит наша модель обучается на нашем наборе данных!
Давайте пойдем дальше и добавим новые слои! В первую очередь я хотел бы добавить свёрточные слои, так как именно они хорошо справляются с сложными изображениями. Только лишь на линейных слоях мы далеко не уйдем.
CNN
Операция свертки состоит в сворачивании изображения с помощью фильтра. Фильтр или kernel — это тоже матрица чисел.
Свертка изображения размера 4х4 фильтром размера 3х3: Полученный результат называется картой активации. Свертка картинки шаг за шагом:
Заметим, что размер карты активации после свертки стал меньше, чем размер изначального изображения. Общая формула размера карты активации такая:
m = (i − f + 2 * padding) / stride + 1,
где:
-
m— размер карты активации; -
i— размер изображения; -
f— размер фильтра. -
padding— отступ от края -
stride— шаг фильтра
Рекомендую ознакомиться с хорошим ноутбуком, чтобы понять интуицию свёрток
Как будет выглядеть свёрточная сеть для классификации картинок:

У нас уже есть слои Flatten и Linear. Остается только Conv2d.
Давайте попробуем реализовать простую операцию свертки одной картинки и одного фильтра.
image = np.array([[0, 50, 0, 29], [0, 80, 31, 2], [33, 90, 0, 75], [0, 9, 0, 95] ]) kernel = np.ones((3, 3)) image, kernel >>>array([[ 0, 50, 0, 29], [ 0, 80, 31, 2], [33, 90, 0, 75], [ 0, 9, 0, 95]]), array([[1., 1., 1.], [1., 1., 1.], [1., 1., 1.]])
i = image.shape[0] f = kernel.shape[0] padding = 0 step = 1 m = (i - f + 2*padding) // step + 1 >>> 2
new_image = np.zeros((m, m)) for y in range(m): for x in range(m): start_x = x * step end_x = start_x + f start_y = y * step end_y = start_y + f new_image[y][x] = np.sum(image[start_y:end_y, start_x:end_x] * kernel) new_image >>> array([[284., 357.], [243., 382.]])
Проверим с библиотечной реализацией.
import scipy.signal scipy.signal.fftconvolve(image, kernel, mode='valid') >>>array([[284., 357.], [243., 382.]])
В дальнейшем мы будем использовать для операции свёртки именно scipy.signal.fftconvolve, потому что это весьма ресурсоемкая операция, а данный метод позволяет посчитать тоже самое быстрее с помощью быстрого преобразования Фурье!
Давайте попробуем цветную картинку, то есть не (height, width), а (channels, height, width)
Спойлер: есть два способа производить свертку многоканальной картинки

Мы будем использовать первый метод, так как он наиболее часто используемый.
Сгенерируем картинку и фильтр
image = np.random.randn(3, 7, 7) kernel = np.ones((3, 3, 3))
Реализация операции почти никак не поменяется
i = image.shape[1] f = kernel.shape[1] padding = 0 step = 1 m = (i-f + 2*padding) // step +1 new_image = np.zeros((m, m)) for y in range(m): for x in range(m): start_x = x * step end_x = start_x + f start_y = y * step end_y = start_y + f new_image[y][x] = np.sum(image[:, start_y:end_y, start_x:end_x] * kernel)
Сравниваем!
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid')) >>> True
Круто!
Теперь давайте сделаем операцию свёртки на нескольких картинках одним фильтром. Опять совсем немного усложним код
image = np.random.randn(10, 3, 7, 7) kernel = np.ones((1, 3, 3, 3)) i = image.shape[-1] f = kernel.shape[-1] padding = 0 step = 1 m = (i-f + 2*padding) // step +1 number_of_images = image.shape[0] new_image = np.zeros((number_of_images, m, m)) for image_n in range(number_of_images): for y in range(m): for x in range(m): start_x = x * step end_x = start_x + f start_y = y * step end_y = start_y + f new_image[image_n][y][x] = np.sum(image[image_n,:, start_y:end_y, start_x:end_x] * kernel)
Проверим
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=1)) >>> True
Ура, получилось, теперь давайте наоборот, пройдемся несколькими фильтрами по одной картинке!
image = np.random.randn(1, 3, 7, 7) kernel = np.ones((5, 3, 3, 3)) i = image.shape[-1] f = kernel.shape[-1] step = 1 m = (i-f) // step +1 number_of_kernels = kernel.shape[0] new_image = np.zeros((number_of_kernels, m, m)) for kernel_n in range(number_of_kernels): for y in range(m): for x in range(m): start_x = x * step end_x = start_x + f start_y = y * step end_y = start_y + f new_image[kernel_n][y][x] = np.sum(image[0, :, start_y:end_y, start_x:end_x] * kernel[kernel_n])
Снова сравниваем!
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=1)) >>> True
Это очень круто! А теперь давайте соберём это всё и реализуем операцию свертки нескольких фильтров на нескольких картинках!
image = np.random.randn(10, 1, 3, 7, 7) kernel = np.ones((1, 5, 3, 3, 3)) i = image.shape[-1] f = kernel.shape[-1] padding = 0 step = 1 m = (i-f + 2*padding) // step +1 number_of_kernels = kernel.shape[1] number_of_images = image.shape[0] new_image = np.zeros((number_of_images, number_of_kernels, m, m)) for image_n in range(number_of_images): for kernel_n in range(number_of_kernels): for y in range(m): for x in range(m): start_x = x * step end_x = start_x + f start_y = y * step end_y = start_y + f new_image[image_n][kernel_n][y][x] = np.sum(image[image_n, 0, :, start_y:end_y, start_x:end_x] * kernel[0, kernel_n])
np.allclose(new_image, scipy.signal.fftconvolve(image, kernel, mode='valid').squeeze(axis=2)) >>> True
Очень крутой результат! Мы с вами смогли полностью реализовать одну из базовых и самых важных операций в нейронных сетях.
В PyTorch за операцию свертки отвечают слои nn.Conv1d, nn.Conv2d, nn.Conv3d — логика у них у всех одинаковая, только немного отличается обработка. Например, мы реализовали nn.Conv2d, если бы мы делали метод 2 на предыдущей картинке, то получили бы nn.Conv1d. Теперь давайте соберем все знания в один класс.
Note!
Давайте условимся, что картинка в этот слой будет подаваться в виде (batch, channels, height, width)
Например, 1 черно-белая картинка должна иметь формат (1, 1, 28, 28)
Также для операции в PyTorch можно использовать разные значения padding и stride. Можно сделать реализацию операции свёртки учитывая эти параметры, но я предлагаю этого не делать, так как это заметно усложнит реализацию!
Итак, padding = 0, stride=1 всегда
Conv2d
class Conv2d: def __init__(self, input_channels: int, output_channels: int, kernel_size: int, bias = True): self.bias = bias self.kernel_size = (output_channels, input_channels, kernel_size, kernel_size) self.filter_array = np.random.randn(1, *self.kerne_size) * 0.1 # домножаем на 0.1 чтобы начальные веса были совсем маленькими self.bias = np.random.randn(1, output_channels, 1, 1) * 0.1 Parameter([self, self.filter_array, self.bias * bias]) def __call__(self, x): self.x = np.expand_dims(x, axis=1) # сохраняем на всякий случай значение # x: (batch, channels, height, width) - > (batch, 1, channels, height, width) return scipy.signal.fftconvolve(np.expand_dims(x, axis=1), self.filter_array, mode='valid').squeeze(axis=2) + self.bias
Как всё красиво выглядит с использованием scipy.signal.fftconvolve, но мы с вами имеем право сделать такую замену, потому что мы разобрались в деталях реализации и убедились, что можем корректно сделать реализацию такой функции. Но конечно вам никто не запрещает заменить scipy.signal.fftconvolve на нашу реализацию.
В свёрточных слоях также используют bias, как в линейных слоях, чтобы получить дополнительную степень свободы при обучении. Метод .backward() добавим чуть позже
Рекомендую также ознакомиться с данным плейлистом, с данным плейлистом и с видео
Идём дальше. Предлагаю не ограничиваться одной функцией активации!
Sigmoid

class Sigmoid: def __init__(self): self.input = None Parameter([self, []]) @staticmethod def sigmoid_(x): return 1 / (1 + np.exp(-x)) def __call__(self, x): self.input = x return self.sigmoid_(x) def backward(self, input_matrix): return (1 - self.sigmoid_(self.input)) * self.sigmoid_(self.input) * input_matrix
Leaky Relu

class Leaky_relu: def __init__(self, a = 0.2): self.a = a Parameter([self, []]) def __call__(self, x): self.input = np.array(x, copy=True) return x * ((1 + np.sign(x)) / 2 + self.a * (1 + np.sign(-x)) / 2) def derivative(self, input_matrix): return ((1 + np.sign(self.input)) / 2 + self.a * (1 + np.sign(-self.input)) / 2) * input_matrix
Tanh

class Tanh: def __init__(self): self.input = None Parameter([self, []]) @staticmethod def tanh_(x): return (np.exp(2 * x) - 1) / (np.exp(2 * x) + 1) def __call__(self, x): self.input = np.array(x, copy=True) return self.tanh_(x) def derivative(self, input_matrix): return 1 - self.tanh_(self.input) * input_matrix
Добавим теперь регуляризацию, она позволяет контролировать величину весов модели.
class CrossEntropyLoss: def __init__(self, l1_reg = 0, l2_reg = 0): self.l1_reg = l1_reg self.l2_reg = l2_reg def backward(self): self.backward_list = [] loss = self.predicted - self.true for index, layer in enumerate(Parameter.layers[::-1]): if type(layer).__name__ == 'Linear': base = (layer.x.T @ loss) / loss.shape[0] l1_term = self.l1_reg * np.sign(Parameter.calling[layer][0]) l2_term = self.l2_reg * Parameter.calling[layer][0] changes_w = base + l1_term + l2_term ... ...
Добавим теперь новые алгоритмы градиентного спуска
Nesterov Accelerated Gradient
Очень рекомендую ознакомиться с данными материалами первое и второе
class NAG: def __init__(self, model, learning_rate, momentum): self.model = model self.lr = learning_rate self.momentum = momentum self.last_grad_w = None # будем хранить здесь предыдущий градиент self.last_grad_b = None # будем хранить здесь предыдущий градиент def step(self): if self.last_grad_w == None: self.last_grad_w = [0] * len(self.model._constructor_Parameter.layers) # храним для каждого слоя отдельно self.last_grad_b = [0] * len(self.model._constructor_Parameter.layers) for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]): if type(layer).__name__ in ('Linear', 'Conv2d'): weight, bias = self.model._constructor_Parameter.calling[layer] weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1] # можно сказать, что это скальзящее среднее для градиентов self.last_grad_w[index] = - self.lr * weight_gradient + self.momentum * self.last_grad_w[index] self.last_grad_b[index] = - self.lr * bias_gradient + self.momentum * self.last_grad_b[index] # обвновляем веса new_weight = weight + self.last_grad_w[index] new_bias = bias + self.last_grad_и[index] # меняем на обновленные веса self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]
RMSProp
Очень рекомендую ознакомиться с данными материалами первое и второе
class RMSProp: def __init__(self, model, learning_rate, ro): self.model = model self.lr = learning_rate if ro < 0 or ro > 1: raise Exception("Incorrect ro value") self.ro = ro self.grad_velocity_w = None self.grad_velocity_b = None def step(self): if self.grad_velocity_w== None: self.grad_velocity_w = [0] * len(self.model._constructor_Parameter.layers) self.grad_velocity_b = [0] * len(self.model._constructor_Parameter.layers) for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]): if type(layer).__name__ in ('Linear', 'Conv2d'): weight, bias = self.model._constructor_Parameter.calling[layer] weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1] self.grad_velocity_w[index] = self.ro * self.grad_velocity_w[index] + (1 - self.ro) * weight_gradient ** 2 self.grad_velocity_b[index] = self.ro * self.grad_velocity_b[index] + (1 - self.ro) * bias_gradient ** 2 new_weight = weight - self.lr * weight_gradient / np.sqrt(self.grad_velocity_w[index] + 1e-5) new_bias = bias - self.lr * bias_gradient / np.sqrt(self.grad_velocity_b[index] + 1e-5) self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]
Adam
Снова очень рекомендую ознакомиться с данными материалами первое и второе
Грубо говоря adam это сумма rmsprop и nag, поэтому просто «сложим» 2 реализации
class Adam: def __init__(self, model, learning_rate, momentum, ro): self.model = model self.lr = learning_rate self.momentum = momentum self.last_grad_w = None self.last_grad_b = None if ro < 0 or ro > 1: raise Exception("Incorrect ro value") self.ro = ro self.grad_velocity_w = None self.grad_velocity_b = None def step(self): if self.last_grad_w == None: self.last_grad_w = [0] * len(self.model._constructor_Parameter.layers) self.last_grad_b = [0] * len(self.model._constructor_Parameter.layers) self.grad_velocity_w = [0] * len(self.model._constructor_Parameter.layers) self.grad_velocity_b = [0] * len(self.model._constructor_Parameter.layers) for index, layer in enumerate(self.model._constructor_Parameter.layers[::-1]): if type(layer).__name__ in ('Linear', 'Conv2d'): weight, bias = self.model._constructor_Parameter.calling[layer] weight_gradient, bias_gradient = layer.backward_list[0], layer.backward_list[1] self.grad_velocity_w[index] = self.ro * self.grad_velocity_w[index] + (1 - self.ro) * weight_gradient ** 2 self.grad_velocity_b[index] = self.ro * self.grad_velocity_b[index] + (1 - self.ro) * bias_gradient ** 2 self.last_grad_w[index] = - self.lr * weight_gradient + self.momentum * self.last_grad_w[index] self.last_grad_b[index] = - self.lr * bias_gradient + self.momentum * self.last_grad_b[index] new_weight = weight + self.last_grad_w[index] / np.sqrt(self.grad_velocity_w[index] + 1e-5) new_bias = bias + self.last_grad_b[index] / np.sqrt(self.grad_velocity_b[index] + 1e-5) self.model._constructor_Parameter.calling[layer] = [new_weight, new_bias]
Обучение
Итак, мы добавили все необходимое. Осталось только добавить обновление весов для Conv2d. Сейчас будет немного сложновато, но у нас всё получится!
Я очень рекомендую ознакомиться с данными материалами, в них ребята на пальцах рассказывают про расчет обратного распространения через свёрточный слой. Я буду пользоваться их результатами
Давайте проверим корректность этих формул в общем случае с помощью scipy.signal.fftconvolve

image = np.random.randn(10, 1, 3, 7, 7) # Наша входная картинка kernel = np.random.randn(1, 5, 3, 3, 3) # Наши фильтры bias = np.random.randn(1, 5, 1, 1) # наш bias result = scipy.signal.fftconvolve(image, kernel, mode='valid') #(10, 5, 1, 5, 5) # Предположим, что это наш dl/dx - по размерности он совпадает как раз
Считаем градиент для фильтров по формуле с картинки
scipy.signal.fftconvolve(image, result, mode='valid').shape >>> (1, 5, 3, 3, 3)
Видим, что совпадает с размерностью фильтров. Идём дальше
Считаем градиент для bias по формуле с картинки
result.sum(axis=(0, 2, 3, 4)) >>> (5,)
У нас также всё совпадает благодаря broadcasting

Считаем градиент для дальнейшего прохождения по формуле с картинки
rot = np.transpose(kernel, (0, 1, 2, 4, 3)) # поменяли местами 2 последние оси pad = np.pad(result, ((0, 0), (0, 0), (0, 0), (2, 2), (2, 2)), 'constant', constant_values=(0)) # Заполнили 2 последние размерности 0 слева и справа
Пример использования np.pad
tmp = np.random.randn(2, 2) np.pad(tmp, ((1, 1), (1, 1)), 'constant', constant_values=(0)) >>>array([[ 0. , 0. , 0. , 0. ], [ 0. , 1.93470843, -1.40590807, 0. ], [ 0. , -0.61160614, 0.15255254, 0. ], [ 0. , 0. , 0. , 0. ]])
scipy.signal.fftconvolve(pad, rot, mode='valid').shape >>> (10, 1, 3, 7, 7)
И у нас всё опять сошлось с размерностью image
Значит мы можем использовать эти формулы для расчетов градиентов. Давайте добавим наконец-то вычисления в класс CrossEntropyLoss
class CrossEntropyLoss: def backward(self): for index, layer in enumerate(Parameter.layers[::-1]): elif type(layer).__name__ == 'Conv2d': expanded_loss = np.expand_dims(loss, axis=2) changes_w = scipy.signal.fftconvolve(layer.x, expanded_loss, mode='valid') changes_b = loss.sum(axis=(0, 2, 3)) * self.bias_flag layer.backward_list = [changes_w, changes_b] rotated_filters = np.transpose(layer.filter_array, (0, 1, 2, 4, 3)) pad = layer.kernel_size[-1] padded_loss = np.pad(expanded_loss, ((0, 0), (0, 0), (0, 0), (pad - 1, pad - 1), (pad - 1, pad - 1)), 'constant', constant_values=(0)) loss = scipy.signal.fftconvolve(padded_loss, rotated_filters, mode='valid').squeeze(axis=1)
Готово! Оказалось совсем несложно, да?)
Теперь давайте немного поменяем алгоритм нашего класса Flatten. Он будет применяться после всех сверточных слоёв чтобы превратить вектор из (batch, channels, height, width) в (batch, channels * height * width) и отправить этот вектор в линейный слой. Также добавим для него метод .backward, потому что он уже является промежуточным слоем.
class Flatten: def __init__(self): Parameter([self, []]) def __call__(self, x): self.init_shape = x.shape return x.reshape(self.init_shape[0], -1) def backward(self, input_matrix): return input_matrix.reshape(self.init_shape) class CrossEntropyLoss: def backward(self): for index, layer in enumerate(Parameter.layers[::-1]): ... elif type(layer).__name__ == 'Flatten': loss = layer.backward(loss)
Всё готово! Давайте собирать и обучать нашу свёрточную нейронку!
class SimpleNet(Module): def __init__(self): super().__init__() self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=5) #28 -> 24 self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=5) #24 -> 20 self.conv3 = Conv2d(input_channels = 8, output_channels = 16, kernel_size=5) #20 -> 16 self.flatten = Flatten() self.linear1 = Linear(input_channels=16 * 16 * 16, output_channels=200, bias=True) self.linear2 = Linear(input_channels=200, output_channels=50, bias=True) self.linear3 = Linear(input_channels=50, output_channels=10, bias=True) self.relu = ReLU() def forward(self, x): x = self.conv1(x) x = self.relu(x) x = self.conv2(x) x = self.relu(x) x = self.conv3(x) x = self.relu(x) x = self.flatten(x) x = self.linear1(x) x = self.relu(x) x = self.linear2(x) x = self.relu(x) x = self.linear3(x) return x loss_fn = CrossEntropyLoss(l1_reg=0.1, l2_reg=0.1) model = SimpleNet() optim = Adam(model.parameters(), learning_rate = 0.001, momentum=0.9, ro=0.9) for i in range(5): for index, batch in enumerate(data_loader): input_x, target = batch input_x = input_x / 255 input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -> (64, 1, 28, 28) output = model(input_x) loss = loss_fn(output, target) loss.backward() optim.step() if index % 1 == 0: print(loss.loss.mean(),"index:", index) print(loss.loss.mean(), "epoch:", i) >>>7.968706513917543 index: 0 7.551666321134583 index: 1 7.278343803030227 index: 2 6.431693753643258 index: 3 5.637298937703794 index: 4 5.633115375065631 index: 5 5.00220357224557 index: 6 5.925713000464821 index: 7 5.045218328819374 index: 8 5.103600182770298 index: 9 4.13759075103372 index: 10 3.576208929225664 index: 11 3.320147584705806 index: 12 2.995831081105269 index: 13 2.8976790588242745 index: 14 2.8176135148843797 index: 15 2.7363129802870807 index: 16 2.297198679475752 index: 17 2.310984252722919 index: 18 2.2726362928678476 index: 19 2.11617617635163 index: 20 2.23533730800758 index: 21 2.064323468827378 index: 22 1.9638605959573385 index: 23 2.1304361443736948 index: 24 2.0881922952210124 index: 25 1.9733052417437202 index: 26 2.099729829345195 index: 27 2.0752790945980193 index: 28
Отлично, модель кое-как обучается, только вот значение loss-функции в какой-то момент перестает изменятся. Возможно причиной этого может быть слишком сложная задача или слишком сложная модель (в одном только linear1 - 16*16*16*200 = 800_000 весов)
Давайте упростим немного задачу нашей модели, введя еще один слой — MaxPool
MaxPool
Pooling — это операция уменьшения карт активации. Делается она очень просто:
-
Берем карту активации, делим ее на квадраты размера два на два;
-
Из каждого квадрата два на два берем только одно число: максимальное число в этом квадрате;
-
Записываем эти числа вместо каждого квадрата два на два. Так мы из карты активации размера (n x n) получаем карту активации размера (n/2 x n/2)
Такая операция называется MaxPooling с ядром размера 2. Ядро размера 2, потому что мы делили карту активации на квадраты размера 2х2.

class MaxPool: def __init__(self, kernel_size: tuple): self.kernel_size = kernel_size Parameter([self, []]) def __call__(self, x): array = x.copy() result_full = np.zeros((array.shape[0], array.shape[1], int(array.shape[2] / self.kernel_size[0]), int(array.shape[3] / self.kernel_size[1]))) for k in range(array.shape[0]): for m in range(array.shape[1]): result = [] self.i = 0 while self.i < array[k][m].shape[0] - self.kernel_size[0] + 1: self.j = 0 while self.j < array[k][m].shape[1] - self.kernel_size[1] + 1: result.append(np.max(array[k][m][self.i:self.i + self.kernel_size[0], self.j:self.j + self.kernel_size[1]])) array[k][m][self.i:self.i + self.kernel_size[0], self.j:self.j + self.kernel_size[1]] = (array[k][m][self.i:self.i + self.kernel_size[0], self.j: self.j + self.kernel_size[1]]) * [array[k][m][self.i:self.i + self.kernel_size[0], self.j:self.j +self.kernel_size[1]] == np.max(array[k][m][self.i:self.i +self.kernel_size[0], self.j:self.j +self.kernel_size[1]])] self.j += self.kernel_size[1] self.i += self.kernel_size[0] result_full[k][m] = np.array(result).reshape(int(array[k][m].shape[0] / self.kernel_size[0]), int(array[k][m].shape[1] / self.kernel_size[1])) self.array = array return result_full
Как видим, логика простая, а реализация не очень. Давайте заменим на более короткую функцию skimage.measure.block_reduce и сравним результаты
import skimage class MaxPool2d: def __init__(self, kernel_size: tuple): self.kernel_size = kernel_size Parameter([self, []]) def __call__(self, x): self.x = np.array(x, copy=True) return skimage.measure.block_reduce(a, (1, 1, *self.kernel_size), np.max) class MinPool2d: def __init__(self, kernel_size: tuple): self.kernel_size = kernel_size Parameter([self, []]) def __call__(self, x): self.x = np.array(x, copy=True) return skimage.measure.block_reduce(a, (1, 1, *self.kernel_size), np.min)
a = np.random.randn(10, 20, 30, 30) maxpool1 = MaxPool1((2, 2)) # Наша реализация maxpool2 = MaxPool2((2, 2)) # Реализация Scimage np.allclose(maxpool1(a), maxpool2(a)) >>>True a = np.random.randn(4, 19, 20, 20) maxpool1 = MaxPool1((5, 5)) # Наша реализация maxpool2 = MaxPool2((5, 5)) # Реализация Scimage np.allclose(maxpool1(a), maxpool2(a)) >>>True
Раз они одинаковые, давайте оставим более короткую реализацию. Углубляться в свою реализацию я не стану, так как она довольная простая, просто немного запутанная.
Теперь добавим backprop для max-min pool
Логика простая, инициализируем нулевую матрицу, и на тех местах, где было наибольшее ( или наименьше в случае MinPool) значение в окне меняем 0 на 1. Всё просто, но реализация всё также немного запутанная! Я верю, что можно сделать более эффективно, но пока не хочу останавливаться на этом!
class CrossEntropyLoss: def backward(self): for index, layer in enumerate(Parameter.layers[::-1]): ... elif type(layer).__name__ == 'MaxPool2d': new_shape = np.zeros(layer.x.shape) for k in range(layer.x.shape[0]): for m in range(layer.x.shape[1]): inx_ = 0 inx__ = 0 layer.i = 0 while layer.i < layer.x[k][m].shape[0] - layer.kernel_size[0] + 1: layer.j = 0 inx__ = 0 while layer.j < layer.x[k][m].shape[1] - layer.kernel_size[1] + 1: new_shape[k][m][layer.i:layer.i + layer.kernel_size[0], layer.j:layer.j + layer.kernel_size[1]] = loss[k][m][inx_][inx__] inx__ += 1 layer.j += layer.kernel_size[1] inx_ += 1 layer.i += layer.kernel_size[0] loss = np.squeeze([layer.x > 0] * new_shape, axis=0) # Для minpool аналогично, только поменять знак
Теперь собираем
class SimpleConvNet(Module): def __init__(self): super().__init__() self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=3) #28 -> 26 self.maxpool1 = MaxPool2d(kernel_size=(2,2)) # 26 -> 13 self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=4) #13 -> 10 self.maxpool2 = MaxPool2d(kernel_size=(2,2)) # 10 -> 5 self.flatten = Flatten() self.linear1 = Linear(input_channels= 5 * 5 * 8, output_channels=50, bias=True) self.linear2 = Linear(input_channels=50, output_channels=10, bias=True) self.relu = ReLU() def forward(self, x): x = self.conv1(x) x = self.relu(x) x = self.maxpool1(x) x = self.conv2(x) x = self.relu(x) x = self.maxpool2(x) x = self.flatten(x) x = self.linear1(x) x = self.relu(x) x = self.linear2(x) return x loss_fn = CrossEntropyLoss(l1_reg=0.001, l2_reg=0.001) model = SimpleConvNet() optim = Adam(model.parameters(), learning_rate = 5e-3, momentum=0.9, ro=0.9) for i in range(5): y_pred_list = [] y_true_list = [] for index, batch in enumerate(data_loader): input_x, target = batch input_x = input_x / 255 input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -> (64, 1, 28, 28) output = model(input_x) loss = loss_fn(output, target) loss.backward() optim.step() y_pred_list.extend(output) y_true_list.extend(np.int_(np.arange(0, 10) == target)) if index % 20 == 0: print(f"loss: {loss.loss.mean():.2f}","index:", index, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}") print(f"loss: {loss.loss.mean():.2f}", "epoch:", i, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}") >>>loss: 2.94 index: 0 acc: 0.08 loss: 2.02 index: 20 acc: 0.23 loss: 1.76 epoch: 0 acc: 0.31 loss: 1.67 index: 0 acc: 0.53 loss: 1.06 index: 20 acc: 0.60 loss: 1.05 index: 40 acc: 0.63 loss: 0.98 index: 60 acc: 0.65 loss: 1.05 epoch: 1 acc: 0.65 loss: 0.98 index: 0 acc: 0.72 loss: 0.92 index: 20 acc: 0.64 loss: 1.27 index: 40 acc: 0.62 loss: 1.04 index: 60 acc: 0.62 loss: 0.97 epoch: 2 acc: 0.63 loss: 0.73 index: 0 acc: 0.77
Отлично! У нас всё обучается!
Теперь давайте добавим еще один метод регуляризации BatchNorm. Будем применять его после линейных слоев. Вычисление производной для BatchNorm. Здесь опять же просто механическая работа в виде следования формулам!
class BatchNorm2d: def __init__(self, size): self.conv = False Parameter([self, np.ones((size)), np.ones((size))]) def __call__(self, x): self.mean = np.mean(x, axis=(0)) self.std = np.std(x, axis=(0)) + 0.0001 self.x = (x - self.mean) / self.std return Parameter.calling[self][0] * self.x + Parameter.calling[self][1] class CrossEntropyLoss: def backward(self): for index, layer in enumerate(Parameter.layers[::-1]): ... layer.backward_list = [np.sum(loss * layer.x, axis = 0), np.sum(loss, axis = 0)] dl_dx = loss * Parameter.calling[layer][0] dl_dstd = np.sum(dl_dx * (layer.x * layer.std) * (-1/2) / (layer.std ** 3), axis = 0) dl_dmean = np.sum(dl_dx * (- 1 / layer.std), axis = 0) + dl_dstd * (np.sum(-2 * layer.x * layer.std, axis = 0) / len(layer.x)) loss = dl_dx / layer.std + dl_dstd * 2 * (layer.x * layer.std) / len(layer.x) + dl_dmean / len(layer.x)
Обучаем
class SimpleConvNet(Module): def __init__(self): super().__init__() self.conv1 = Conv2d(input_channels = 1, output_channels = 4, kernel_size=3) #28 -> 26 self.maxpool1 = MaxPool2d(kernel_size=(2,2)) # 26 -> 13 self.conv2 = Conv2d(input_channels = 4, output_channels = 8, kernel_size=4) #13 -> 10 self.maxpool2 = MaxPool2d(kernel_size=(2,2)) # 10 -> 5 self.flatten = Flatten() self.linear1 = Linear(input_channels=5 * 5 * 8, output_channels=50, bias=True) self.bn = BatchNorm2d(50) self.linear2 = Linear(input_channels=50, output_channels=10, bias=True) self.relu = ReLU() def forward(self, x): x = self.conv1(x) x = self.relu(x) x = self.maxpool1(x) x = self.conv2(x) x = self.relu(x) x = self.maxpool2(x) x = self.flatten(x) x = self.linear1(x) x = self.bn(x) x = self.relu(x) x = self.linear2(x) return x loss_fn = CrossEntropyLoss(l1_reg=0.005, l2_reg=0.005) model = SimpleConvNet() optim = Adam(model.parameters(), learning_rate = 1e-3, momentum=0.99, ro=0.99) for i in range(5): y_pred_list = [] y_true_list = [] for index, batch in enumerate(data_loader): input_x, target = batch input_x = input_x / 255 input_x = np.expand_dims(input_x, axis=1) # (64, 28, 28) -> (64, 1, 28, 28) output = model(input_x) loss = loss_fn(output, target) loss.backward() optim.step() y_pred_list.extend(output) y_true_list.extend(np.int_(np.arange(0, 10) == target)) if index % 20 == 0: print(f"loss: {loss.loss.mean():.2f}","index:", index, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}") print(f"loss: {loss.loss.mean():.2f}", "epoch:", i, 'acc:', f"{accuracy(np.array(y_true_list), np.array(y_pred_list)):.2f}") >>>loss: 4.56 index: 0 acc: 0.12 loss: 0.66 index: 20 acc: 0.59 loss: 0.74 epoch: 0 acc: 0.65 loss: 0.59 index: 0 acc: 0.78 loss: 1.24 index: 20 acc: 0.74 loss: 0.55 index: 40 acc: 0.77 loss: 0.54 index: 60 acc: 0.78 loss: 1.45 epoch: 1 acc: 0.78 loss: 0.49 index: 0 acc: 0.84 ...
Мы сделали большую работу! Добавили много слоев, реализовали несколько алгоритмов градиентого спуска и обучили свёрточную модель для задачи классификации изображений!
Вот и на этом вторая часть подошла к концу. Полный код можно найти тут
В третьей части я планирую:
-
представить аналог
pytorch.tensor() -
перевести все вычисления на динамический вычислительный граф
-
добавить Embedding, LayerNorm, ModuleList
-
провести рефакторинг библиотеки
-
добавить перенос вычислений на gpu
-
написать на библиотеке gpt2-1.5В и запустить его
ссылка на оригинал статьи https://habr.com/ru/articles/869520/
Добавить комментарий