Пишем numpy-модуль для ускорения математических функций с помощью SIMD-инструкций

от автора

Пакеты numpy и scipy предоставляют прекрасные возможности для быстрого решения различных вычислительных задач. Концепция универсальных функций (ufunc), работающих как со скалярными значениями, так и с массивами различных размерностей, позволяет получить высокую производительность при сохранении присущей языку Python простоты и элегантности. Универсальная функция обычно используются для выполнения одной операции над большим массивом данных, что идеально подходит для оптимизации с помощью SIMD-инструкций, однако мне не удалось найти готового решения, основанного на свободном программном обеспечении и позволяющего использовать SIMD для вычисления в numpy таких математических функций, как синус, косинус и экспонента. Реализовывать алгоритмы вычисления этих функций с нуля совсем не хотелось, но к счастью в интернете нашлось несколько свободных библиотек на языке «С». Преодолев лень сомнения, я решил написать собственный numpy-модуль, предлагающий универсальные функции для синуса, косинуса и экспоненты. За подробностями и результатами тестов добро пожаловать под кат.

Немного о SIMD-инструкциях

SIMD-инструкции позволяют одновременно производить один и тот же набор операций над несколькими числами, записанными в одном регистре. Таким образом, за один такт можно обрабатывать сразу несколько чисел и потенциально увеличить производительность в несколько раз.
Например, набор SIMD-инструкций Advanced Vector Extensions (AVX) позволяет выполнять операции с 256-битными регистрами, каждый из которых может включать восемь 32-разрядных чисел с плавающей точкой (числа одинарной точности), либо четыре 64-разрядных (числа двойной точности). Набор операций довольно скромный, в основном это сложение, вычитание, умножение и деление. Точная и быстрая реализация тригонометрических функций с помощью этих операций — задача не тривиальная и, чтобы не изобретать велосипед, стоит использовать какую-нибудь готовую библиотеку. Помимо проприетарной Intel MKL(которая уже умеет работать с numpy) вариантов нашлось всего три (раз, два, три).
Первый вариант представляет собой заголовочный файл с очень скромным набором функций, практически без документации и тестов. Второй вариант — С++ библиотека vecmathlib, которая у меня почему-то упорно отказывалась компилироваться, несмотря на использование рекомендованного компилятора GCC-4.7. Вариант перспективный, но пока еще похоже сырой. Третий вариант — библиотеку SLEEF — удалось найти именно благодаря vecmathlib, которая использует его кодовую базу. Вариант мне сразу понравился простотой и ясностью кода, а также обилием тестов.

Небольшой тест для мотивации

Чтобы получить достаточную мотивацию для написания модуля, а заодно разобраться с использованием SLEEF, я решил сравнить скорость вычисления синуса на языке «C» при использовании SLEEF со стандартной math.h. Естественно, речь идет о поэлементном вычислении синуса для большого массива данных.
К сожалению, документации и примеров в SLEEF практически нет, но зато есть вполне понятно написанные тесты, так что разобраться с использованием библиотеки было не сложно. Исходный код SLEEF состоит из четырех директорий: java, purec, simd и tester. Кроме этого, там лежит файл README с кратким описанием библиотеки и общий Makefile, дергающий Makefile из перечисленных директорий. Меня естественно больше всего заинтересовала директория simd, в которой, как можно догадаться из названия, содержались оптимизированные с помощью SIMD-инструкций функции.
Из Makefile в директории simd понятно, что поддерживаются 4 варианта SIMD-инструкций: SSE2, AVX, AVX2 и FMA4. Прототипы функций определены в файле sleefsimd.h, а нужный набор инструкций выбирается при компиляции с помощью флагов -DENABLE_SSE2, -DENABLE_AVX, -DENABLE_AVX2 или -DENABLE_FMA4. Makefile собирает исполняемые файлы для тестирования функций с использованием каждого из наборов инструкций: iutsse2, iutavx, iutavx2 или iutfma4. Эти файлы вызываются из универсальной программы tester (из директории tester) и осуществляют выполнение получаемых от tester команд. Реализация команд находится в файле iut.c, откуда становится очевидным способ использования библиотеки.

Функция тестирования синуса из файла simd/iut.c исходного кода SLEEF

double xxsin(double d) {    double s[VECTLENDP];   int i;   for(i=0;i<VECTLENDP;i++) {     s[i] = random()/(double)RAND_MAX*20000-10000;   }   int idx = random() & (VECTLENDP-1);   s[idx] = d;     vdouble a = vloadu(s);   a = xsin(a);   vstoreu(s, a);    return s[idx]; } 

Для чисел двойной точности (double) нужно определить массив длиной VECTLENDP, заполнить аргументами интересующей нас функции и передать в функцию vloadu, которая скопирует их в необходимое для использования SIMD место и вернет значение типа vdouble. Значение vdouble мы передаем функции xsin, которая вычисляет значение синуса сразу для всех VECLENDP аргументов и возвращает опять же vdouble. Результат распаковывается в массив из double с помощью функции vstoreu.
Для желающих проверить SLEEF на своей машине привожу полный исходный код программы, которую написал для оценки потенциального ускорения от использования SIMD с помощью SLEEF.

Программа для оценки скорости вычисления синуса с помощью SLEEF

#include <stdio.h> #include <stdlib.h> #include <time.h> #include <math.h> #include "sleefsimd.h"  #define TESTSIZE (VECTLENDP*10000000) double s[TESTSIZE]; double r1[TESTSIZE]; double r2[TESTSIZE]; #define COUNT 10 int main(int argc, char *argv[]) { 	int k, i;     clock_t t1, t2;     double time1, time2;     double max, rmax;      srandom(time(NULL)); 	for(i = 0; i < TESTSIZE; i++)      { 		s[i] = random()/(double)RAND_MAX*20000-10000; 	}  	printf("Testing sin, %d values\n", TESTSIZE*COUNT); 	t1 = clock(); 	for(k = 0; k < COUNT; k++) 	{ 		for(i = 0; i < TESTSIZE; i++)          { 			r1[i] = sin(s[i]); 		} 	}     t2 = clock();     time1 = (double)(t2 - t1)/CLOCKS_PER_SEC; 	printf("Finish sin, spent time = %lg sec\n\n", time1);  	printf("Testing xsin\n"); 	t1 = clock(); 	for(k = 0; k < COUNT; k++) 	{ 		for(i = 0; i < TESTSIZE; i += VECTLENDP)          { 			vdouble a = vloadu(s+i); 			a = xsin(a); 			vstoreu(r2+i, a); 		} 	}     t2 = clock();     time2 = (double)(t2 - t1)/CLOCKS_PER_SEC; 	printf("Finish xsin, spent time = %lg sec\n\n", time2);  	printf("Speed ratio: %lf\n", time1/time2);  	max = r1[0] - r2[0];     rmax = (r1[0] - r2[0])/r1[0]; 	for(i = 0; i < TESTSIZE; i++)      { 		double delta = (r1[i] - r2[i]); 		if(abs(delta) > abs(max)) max = delta;         delta = (r1[i] - r2[i])/r1[i]; 		if(abs(delta) > abs(max)) rmax = delta; 	}  	printf("Max absolute delta: %lg, relative delta %lg\n", max, rmax); 	return 0; } 

Самый продвинутый из поддерживаемых на моем компьютере набор команд — это AVX, поэтому я компилировал программу (записанную в файл simd/speedtest.c в исходниках SLEEF) следующей командой:

gcc -O3 -Wall -Wno-unused -Wno-attributes -DENABLE_AVX -mavx speedtest.c sleefsimddp.c sleefsimdsp.c -o speedtest -lm 

Я ожидал ускорения примерно в 4 раза, но результат превзошел все мои ожидания:

Testing sin, 400000000 values Finish sin, spent time = 14.95 sec  Testing xsin Finish xsin, spent time = 1.31 sec  Speed ratio: 11.412214 Max absolute delta: 5.55112e-17, relative delta 1.58441e-16 

Ускорение более чем в 10 раз, при относительной погрешности вычисления менее 2·10-16 (порядка точности самого double), на одном ядре процессора! Конечно, в реальном приложении ускорение будет меньше, но мотивации для написания своего numpy-модуля уже достаточно.

Пару слов об универсальных функциях

В numpy данные представляются в виде многомерных массивов. Универсальные функции поэлементно работают с массивами любой размерности, причем в случае нескольких параметров их размерность может не совпадать. Параметры универсальной функции сначала приводятся к одной размерности в соответствии со специальными правилами (это называется Broadcasting), а затем необходимые вычисления проводятся поэлементно. На выходе получается массив наибольшей из размерностей.
Например, одна и та же функция add (которая автоматически применяется при использовании оператора "+" для numpy-массивов) позволяет сложить как два числа или одномерных массива, так и добавить число к массиву или одномерный массив к двумерному.

Пример

>>> from numpy import array, add  >>> add(1, 2) 3  >>> add(array([1,2]), array([4,5])) # поэлементно складываем два массива одинаковой размерности array([5, 7])  >>> add(array([1,2]), 1) # добавляем к одномерному массиву число (т.е. массив нулевой размерности) array([2, 3])  >>> add(array([[1,2],[3,4]]), array([1,2])) # добавляем к двумерному массиву одномерный (построчно) array([[2, 4],        [4, 6]]) 

Более подробную информацию по numpy на английском можно найти в официальной документации, на русском — например здесь.

Пишем свой numpy-модуль с универсальной функцией и SIMD-инструкциями

У numpy и skipy довольно удобный API и неплохая документация, в которой для желающих написать собственный numpy-модуль с универсальной функцией есть соответствующий tutorial. Сначала мы пишем C-функцию, в линейном цикле вычисляющую значение интересующей нас математической функции от скалярного аргумента:

Фукнция для вычисления синуса в numpy-модуле

static void double_xsin(char **args, npy_intp *dimensions,                             npy_intp* steps, void* data) {     npy_intp i;     npy_intp n = dimensions[0];     char *in = args[0], *out = args[1];     npy_intp in_step = steps[0], out_step = steps[1];     double tmp[VECTLENDP];     vdouble a;     int slow_n = n % VECTLENDP;     if(in_step != sizeof(double) || out_step != sizeof(double))         slow_n = n;     for(i = 0; i < slow_n; i += VECTLENDP)     {         int j;         for(j = 0; j < VECTLENDP && i + j < slow_n; j++)         {             tmp[j] = *(double *)in;             in += in_step;                     }         a = vloadu(tmp);         a = xsin(a);         vstoreu(tmp, a);         for(j = 0; j < VECTLENDP && i + j < slow_n; j++)         {             *(double *)out = tmp[j];             out += out_step;         }             }     if(n > slow_n)     {         double *in_array = (double *)in;         double *out_array = (double *)out;         for(i = 0; i < n - slow_n; i += VECTLENDP)         {             a = vloadu(in_array + i);         	a = xsin(a); 	        vstoreu(out_array + i, a);             }     } } 

Указатели на входные и выходные данные numpy передает нам в массиве args. В нашем случае у функции один вход и один выход, поэтому адрес входных данных args[0], выходных — args[1]. Количество элементов передается в dimensions[0]. Для перемещения по входным данным нужно увеличивать указатель на steps[0], по выходным — на steps[1] (важно, что указатель имеет тип char, поскольку в steps указаны значения в байтах). К сожалению, мне не удалось найти в документации numpy утверждения, что значения в steps должны быть равны размерам соответствующих типов данных, хотя эксперимент показал, что на моей системе для массивов ненулевой размерности это правило выполняется. В случае его нарушения вычисления будут медленнее, поскольку потребуется дополнительное копирование элементов в массив tmp и из него.
Одна и та же универсальная функция в numpy может работать с различными типами данных, но для каждого типа данных пишется отдельная C-функция. При регистрации универсальной функции мы указываем поддерживаемые типы и для каждой комбинации входных и выходных типов передаем указатель на С-функцию, которая будет с этой комбинацией работать:

static PyUFuncGenericFunction funcs[] = {&double_xsin}; static char types[] = {NPY_DOUBLE, NPY_DOUBLE}; static void *data[] = {NULL}; 

В массиве types указываются как входные, так и выходные типы, поэтому он длиннее массивов funcs и data. Массив указателей data позволяет для каждой комбинации типов указать свой дополнительный параметр, который будет передан в C-фунцкию в качестве последнего аргумента void* data. В частности, это можно использовать для реализации разных универсальных функций с помощью одной C-функции.
Чтобы зарегистрировать нашу универсальную функцию, нужно вызвать PyUFunc_FromFuncAndData и передать ей описанные выше массивы (funcs, data и types), а также количество входных и выходных аргументов универсальной функции, количество поддерживаемых комбинаций типов, имя функции в модуле и строку документации.

Полный исходный код модуля

#include "Python.h" #include "numpy/ndarraytypes.h" #include "numpy/ufuncobject.h" #include "numpy/npy_3kcompat.h" #include "sleef/sleefsimd.h"  /* The loop definition must precede the PyMODINIT_FUNC. */ static void double_xsin(char **args, npy_intp *dimensions,                             npy_intp* steps, void* data) {     npy_intp i;     npy_intp n = dimensions[0];     char *in = args[0], *out = args[1];     npy_intp in_step = steps[0], out_step = steps[1];     double tmp[VECTLENDP];     vdouble a;     int slow_n = n % VECTLENDP;     if(in_step != sizeof(double) || out_step != sizeof(double))         slow_n = n;     for(i = 0; i < slow_n; i += VECTLENDP)     {         int j;         for(j = 0; j < VECTLENDP && i + j < slow_n; j++)         {             tmp[j] = *(double *)in;             in += in_step;                     }         a = vloadu(tmp);         a = xsin(a);         vstoreu(tmp, a);         for(j = 0; j < VECTLENDP && i + j < slow_n; j++)         {             *(double *)out = tmp[j];             out += out_step;         }             }     if(n > slow_n)     {         double *in_array = (double *)in;         double *out_array = (double *)out;         for(i = 0; i < n - slow_n; i += VECTLENDP)         {             a = vloadu(in_array + i);         	a = xsin(a); 	        vstoreu(out_array + i, a);             }     } }  static PyMethodDef AvxmathMethods[] = {         {NULL, NULL, 0, NULL} };  static PyUFuncGenericFunction funcs[1] = {&double_xsin}; static char types[] = {NPY_DOUBLE, NPY_DOUBLE}; static void *data[] = {NULL};  void register_xsin(PyObject *module) {     PyObject *xsin, *d;     import_array();     import_umath();      xsin = PyUFunc_FromFuncAndData(funcs, data, types, 1, 1, 1,                                     PyUFunc_None, "sin",                                     "AVX-accelerated sine calculation", 0);     d = PyModule_GetDict(module);     PyDict_SetItemString(d, "sin", xsin);     Py_DECREF(xsin); }  #if PY_VERSION_HEX >= 0x03000000 static struct PyModuleDef moduledef = {     PyModuleDef_HEAD_INIT,     "avxmath",     NULL,     -1,     AvxmathMethods,     NULL,     NULL,     NULL,     NULL };  PyMODINIT_FUNC PyInit_avxmath(void) {     PyObject *m;     m = PyModule_Create(&moduledef);     if (!m) {         return NULL;     }     register_xsin(m);     return m; } #else PyMODINIT_FUNC initavxmath(void) {     PyObject *m;     m = Py_InitModule("avxmath", AvxmathMethods);     if (m == NULL) {         return;     }     register_xsin(m); } #endif 

Для сборки модуля я использовал стандартный setup.py из документации, заменив название модуля и добавив C-файлы библиотеки SLEEF, флаги компилятора и линковщика. Приведенный выше исходный код модуля я сохранил рядом с setup.py в файле с именем avxmath.c, директорию simd из исходного кода SLEEF переименовал в sleef и также положил рядом с setup.py.

Файл setup.py для сборки модуля avxmath

def configuration(parent_package='', top_path=None):     import numpy     from numpy.distutils.misc_util import Configuration      config = Configuration('',                            parent_package,                            top_path)     config.add_extension('avxmath', ['avxmath.c', 'sleef/sleefsimddp.c', 'sleef/sleefsimdsp.c'],                           extra_compile_args=['-O3', '-Wall', '-Wno-unused', '-Wno-attributes', '-DENABLE_AVX','-mavx'],                           extra_link_args=['-lm'])      return config  if __name__ == "__main__":     from numpy.distutils.core import setup     setup(configuration=configuration) 

Для компиляции без установки в систему нужно выполнить команду python setup.py build_ext --inplace, результатом успешного выполнения которой должен стать готовый модуль в файле avxmath.so. Теперь можно проверить работоспособность нашей функции. Запускаем python в той же директории, где находится файл avxmath.so, и проверяем:

>>> from numpy import array, pi >>> import avxmath >>> avxmath.sin(0) 0.0 >>> avxmath.sin(pi) 1.2246467991473532e-16 >>> avxmath.sin([0, pi/2, pi, 3*pi/2, 2*pi]) array([  0.00000000e+00,   1.00000000e+00,   1.22464680e-16,         -1.00000000e+00,  -2.44929360e-16]) >>>  

Убедившись, что модуль avxmath импортируется и работает без ошибок, можно сделать небольшой тест производительности и точности новой функции.

Программа проверки функции sin модуля avxmath и результат ее выполнения

import numpy  import avxmath import time from numpy import random, pi  COUNT=10  x = 2e4*random.random(40000000) - 1e4 t = time.clock() for i in xrange(COUNT):     y1 = numpy.sin(x) duration1 = time.clock() - t print "numpy.sin %f sec" % duration1  t = time.clock() for i in xrange(COUNT):     y2 = avxmath.sin(x) duration2 = time.clock() - t print "avxmath.sin %f sec" % duration2  delta = y2 - y1 rdelta = delta/y1 print "max absolute difference is %lg, relative %lg" % (         delta[abs(delta).argmax()], rdelta[abs(rdelta).argmax()]) print "speedup is %lg" % (duration1/duration2) 

numpy.sin 15.510000 sec avxmath.sin 2.260000 sec max absolute difference is 2.22045e-16, relative 2.63873e-16 speedup is 6.86283 

Итак, мы получили ускорение более чем в 6 раз при точности вычислений не хуже 3·10-16! Заменив вызовы функции xsin на простое копирование памяти, нетрудно убедиться, что ускорение в 10 раз не получилось из-за того, что около 1 секунды из полученных нами 2.26 секунд выполнения ушло на накладные расходы. Аналогично, заменив функцию xsin на обычный синус из math.h, мы обнаружим, что времена вычислений с помощью avxmath.sin и numpy.sin в нашем тесте с высокой точностью совпадут.
Таким образом, с помощью SIMD инструкций можно добиться значительного ускорения выполняемых с помощью numpy и scipy вычислений, просто заменив импорты обычных функций на оптимизированные. Ну и конечно исходные коды несколько расширенного по сравнению с данной статьей модуля avxmath доступны на Github по ссылке.

ссылка на оригинал статьи http://habrahabr.ru/post/198916/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *