Немного о SIMD-инструкциях
SIMD-инструкции позволяют одновременно производить один и тот же набор операций над несколькими числами, записанными в одном регистре. Таким образом, за один такт можно обрабатывать сразу несколько чисел и потенциально увеличить производительность в несколько раз.
Например, набор SIMD-инструкций Advanced Vector Extensions (AVX) позволяет выполнять операции с 256-битными регистрами, каждый из которых может включать восемь 32-разрядных чисел с плавающей точкой (числа одинарной точности), либо четыре 64-разрядных (числа двойной точности). Набор операций довольно скромный, в основном это сложение, вычитание, умножение и деление. Точная и быстрая реализация тригонометрических функций с помощью этих операций — задача не тривиальная и, чтобы не изобретать велосипед, стоит использовать какую-нибудь готовую библиотеку. Помимо проприетарной Intel MKL(которая уже умеет работать с numpy) вариантов нашлось всего три (раз, два, три).
Первый вариант представляет собой заголовочный файл с очень скромным набором функций, практически без документации и тестов. Второй вариант — С++ библиотека vecmathlib, которая у меня почему-то упорно отказывалась компилироваться, несмотря на использование рекомендованного компилятора GCC-4.7. Вариант перспективный, но пока еще похоже сырой. Третий вариант — библиотеку SLEEF — удалось найти именно благодаря vecmathlib, которая использует его кодовую базу. Вариант мне сразу понравился простотой и ясностью кода, а также обилием тестов.
Небольшой тест для мотивации
Чтобы получить достаточную мотивацию для написания модуля, а заодно разобраться с использованием SLEEF, я решил сравнить скорость вычисления синуса на языке «C» при использовании SLEEF со стандартной math.h
. Естественно, речь идет о поэлементном вычислении синуса для большого массива данных.
К сожалению, документации и примеров в SLEEF практически нет, но зато есть вполне понятно написанные тесты, так что разобраться с использованием библиотеки было не сложно. Исходный код SLEEF состоит из четырех директорий: java
, purec
, simd
и tester
. Кроме этого, там лежит файл README с кратким описанием библиотеки и общий Makefile, дергающий Makefile из перечисленных директорий. Меня естественно больше всего заинтересовала директория simd
, в которой, как можно догадаться из названия, содержались оптимизированные с помощью SIMD-инструкций функции.
Из Makefile в директории simd
понятно, что поддерживаются 4 варианта SIMD-инструкций: SSE2, AVX, AVX2 и FMA4. Прототипы функций определены в файле sleefsimd.h
, а нужный набор инструкций выбирается при компиляции с помощью флагов -DENABLE_SSE2
, -DENABLE_AVX
, -DENABLE_AVX2
или -DENABLE_FMA4
. Makefile собирает исполняемые файлы для тестирования функций с использованием каждого из наборов инструкций: iutsse2
, iutavx
, iutavx2
или iutfma4
. Эти файлы вызываются из универсальной программы tester (из директории tester
) и осуществляют выполнение получаемых от tester команд. Реализация команд находится в файле iut.c
, откуда становится очевидным способ использования библиотеки.
double xxsin(double d) { double s[VECTLENDP]; int i; for(i=0;i<VECTLENDP;i++) { s[i] = random()/(double)RAND_MAX*20000-10000; } int idx = random() & (VECTLENDP-1); s[idx] = d; vdouble a = vloadu(s); a = xsin(a); vstoreu(s, a); return s[idx]; }
Для чисел двойной точности (double
) нужно определить массив длиной VECTLENDP
, заполнить аргументами интересующей нас функции и передать в функцию vloadu
, которая скопирует их в необходимое для использования SIMD место и вернет значение типа vdouble
. Значение vdouble
мы передаем функции xsin
, которая вычисляет значение синуса сразу для всех VECLENDP
аргументов и возвращает опять же vdouble
. Результат распаковывается в массив из double
с помощью функции vstoreu
.
Для желающих проверить SLEEF на своей машине привожу полный исходный код программы, которую написал для оценки потенциального ускорения от использования SIMD с помощью SLEEF.
#include <stdio.h> #include <stdlib.h> #include <time.h> #include <math.h> #include "sleefsimd.h" #define TESTSIZE (VECTLENDP*10000000) double s[TESTSIZE]; double r1[TESTSIZE]; double r2[TESTSIZE]; #define COUNT 10 int main(int argc, char *argv[]) { int k, i; clock_t t1, t2; double time1, time2; double max, rmax; srandom(time(NULL)); for(i = 0; i < TESTSIZE; i++) { s[i] = random()/(double)RAND_MAX*20000-10000; } printf("Testing sin, %d values\n", TESTSIZE*COUNT); t1 = clock(); for(k = 0; k < COUNT; k++) { for(i = 0; i < TESTSIZE; i++) { r1[i] = sin(s[i]); } } t2 = clock(); time1 = (double)(t2 - t1)/CLOCKS_PER_SEC; printf("Finish sin, spent time = %lg sec\n\n", time1); printf("Testing xsin\n"); t1 = clock(); for(k = 0; k < COUNT; k++) { for(i = 0; i < TESTSIZE; i += VECTLENDP) { vdouble a = vloadu(s+i); a = xsin(a); vstoreu(r2+i, a); } } t2 = clock(); time2 = (double)(t2 - t1)/CLOCKS_PER_SEC; printf("Finish xsin, spent time = %lg sec\n\n", time2); printf("Speed ratio: %lf\n", time1/time2); max = r1[0] - r2[0]; rmax = (r1[0] - r2[0])/r1[0]; for(i = 0; i < TESTSIZE; i++) { double delta = (r1[i] - r2[i]); if(abs(delta) > abs(max)) max = delta; delta = (r1[i] - r2[i])/r1[i]; if(abs(delta) > abs(max)) rmax = delta; } printf("Max absolute delta: %lg, relative delta %lg\n", max, rmax); return 0; }
Самый продвинутый из поддерживаемых на моем компьютере набор команд — это AVX, поэтому я компилировал программу (записанную в файл simd/speedtest.c
в исходниках SLEEF) следующей командой:
gcc -O3 -Wall -Wno-unused -Wno-attributes -DENABLE_AVX -mavx speedtest.c sleefsimddp.c sleefsimdsp.c -o speedtest -lm
Я ожидал ускорения примерно в 4 раза, но результат превзошел все мои ожидания:
Testing sin, 400000000 values Finish sin, spent time = 14.95 sec Testing xsin Finish xsin, spent time = 1.31 sec Speed ratio: 11.412214 Max absolute delta: 5.55112e-17, relative delta 1.58441e-16
Ускорение более чем в 10 раз, при относительной погрешности вычисления менее 2·10-16 (порядка точности самого double
), на одном ядре процессора! Конечно, в реальном приложении ускорение будет меньше, но мотивации для написания своего numpy-модуля уже достаточно.
Пару слов об универсальных функциях
В numpy данные представляются в виде многомерных массивов. Универсальные функции поэлементно работают с массивами любой размерности, причем в случае нескольких параметров их размерность может не совпадать. Параметры универсальной функции сначала приводятся к одной размерности в соответствии со специальными правилами (это называется Broadcasting), а затем необходимые вычисления проводятся поэлементно. На выходе получается массив наибольшей из размерностей.
Например, одна и та же функция add (которая автоматически применяется при использовании оператора "+" для numpy-массивов) позволяет сложить как два числа или одномерных массива, так и добавить число к массиву или одномерный массив к двумерному.
>>> from numpy import array, add >>> add(1, 2) 3 >>> add(array([1,2]), array([4,5])) # поэлементно складываем два массива одинаковой размерности array([5, 7]) >>> add(array([1,2]), 1) # добавляем к одномерному массиву число (т.е. массив нулевой размерности) array([2, 3]) >>> add(array([[1,2],[3,4]]), array([1,2])) # добавляем к двумерному массиву одномерный (построчно) array([[2, 4], [4, 6]])
Более подробную информацию по numpy на английском можно найти в официальной документации, на русском — например здесь.
Пишем свой numpy-модуль с универсальной функцией и SIMD-инструкциями
У numpy и skipy довольно удобный API и неплохая документация, в которой для желающих написать собственный numpy-модуль с универсальной функцией есть соответствующий tutorial. Сначала мы пишем C-функцию, в линейном цикле вычисляющую значение интересующей нас математической функции от скалярного аргумента:
static void double_xsin(char **args, npy_intp *dimensions, npy_intp* steps, void* data) { npy_intp i; npy_intp n = dimensions[0]; char *in = args[0], *out = args[1]; npy_intp in_step = steps[0], out_step = steps[1]; double tmp[VECTLENDP]; vdouble a; int slow_n = n % VECTLENDP; if(in_step != sizeof(double) || out_step != sizeof(double)) slow_n = n; for(i = 0; i < slow_n; i += VECTLENDP) { int j; for(j = 0; j < VECTLENDP && i + j < slow_n; j++) { tmp[j] = *(double *)in; in += in_step; } a = vloadu(tmp); a = xsin(a); vstoreu(tmp, a); for(j = 0; j < VECTLENDP && i + j < slow_n; j++) { *(double *)out = tmp[j]; out += out_step; } } if(n > slow_n) { double *in_array = (double *)in; double *out_array = (double *)out; for(i = 0; i < n - slow_n; i += VECTLENDP) { a = vloadu(in_array + i); a = xsin(a); vstoreu(out_array + i, a); } } }
Указатели на входные и выходные данные numpy передает нам в массиве args
. В нашем случае у функции один вход и один выход, поэтому адрес входных данных args[0]
, выходных — args[1]
. Количество элементов передается в dimensions[0]
. Для перемещения по входным данным нужно увеличивать указатель на steps[0]
, по выходным — на steps[1]
(важно, что указатель имеет тип char
, поскольку в steps
указаны значения в байтах). К сожалению, мне не удалось найти в документации numpy утверждения, что значения в steps
должны быть равны размерам соответствующих типов данных, хотя эксперимент показал, что на моей системе для массивов ненулевой размерности это правило выполняется. В случае его нарушения вычисления будут медленнее, поскольку потребуется дополнительное копирование элементов в массив tmp
и из него.
Одна и та же универсальная функция в numpy может работать с различными типами данных, но для каждого типа данных пишется отдельная C-функция. При регистрации универсальной функции мы указываем поддерживаемые типы и для каждой комбинации входных и выходных типов передаем указатель на С-функцию, которая будет с этой комбинацией работать:
static PyUFuncGenericFunction funcs[] = {&double_xsin}; static char types[] = {NPY_DOUBLE, NPY_DOUBLE}; static void *data[] = {NULL};
В массиве types
указываются как входные, так и выходные типы, поэтому он длиннее массивов funcs
и data
. Массив указателей data
позволяет для каждой комбинации типов указать свой дополнительный параметр, который будет передан в C-фунцкию в качестве последнего аргумента void* data
. В частности, это можно использовать для реализации разных универсальных функций с помощью одной C-функции.
Чтобы зарегистрировать нашу универсальную функцию, нужно вызвать PyUFunc_FromFuncAndData
и передать ей описанные выше массивы (funcs
, data
и types
), а также количество входных и выходных аргументов универсальной функции, количество поддерживаемых комбинаций типов, имя функции в модуле и строку документации.
#include "Python.h" #include "numpy/ndarraytypes.h" #include "numpy/ufuncobject.h" #include "numpy/npy_3kcompat.h" #include "sleef/sleefsimd.h" /* The loop definition must precede the PyMODINIT_FUNC. */ static void double_xsin(char **args, npy_intp *dimensions, npy_intp* steps, void* data) { npy_intp i; npy_intp n = dimensions[0]; char *in = args[0], *out = args[1]; npy_intp in_step = steps[0], out_step = steps[1]; double tmp[VECTLENDP]; vdouble a; int slow_n = n % VECTLENDP; if(in_step != sizeof(double) || out_step != sizeof(double)) slow_n = n; for(i = 0; i < slow_n; i += VECTLENDP) { int j; for(j = 0; j < VECTLENDP && i + j < slow_n; j++) { tmp[j] = *(double *)in; in += in_step; } a = vloadu(tmp); a = xsin(a); vstoreu(tmp, a); for(j = 0; j < VECTLENDP && i + j < slow_n; j++) { *(double *)out = tmp[j]; out += out_step; } } if(n > slow_n) { double *in_array = (double *)in; double *out_array = (double *)out; for(i = 0; i < n - slow_n; i += VECTLENDP) { a = vloadu(in_array + i); a = xsin(a); vstoreu(out_array + i, a); } } } static PyMethodDef AvxmathMethods[] = { {NULL, NULL, 0, NULL} }; static PyUFuncGenericFunction funcs[1] = {&double_xsin}; static char types[] = {NPY_DOUBLE, NPY_DOUBLE}; static void *data[] = {NULL}; void register_xsin(PyObject *module) { PyObject *xsin, *d; import_array(); import_umath(); xsin = PyUFunc_FromFuncAndData(funcs, data, types, 1, 1, 1, PyUFunc_None, "sin", "AVX-accelerated sine calculation", 0); d = PyModule_GetDict(module); PyDict_SetItemString(d, "sin", xsin); Py_DECREF(xsin); } #if PY_VERSION_HEX >= 0x03000000 static struct PyModuleDef moduledef = { PyModuleDef_HEAD_INIT, "avxmath", NULL, -1, AvxmathMethods, NULL, NULL, NULL, NULL }; PyMODINIT_FUNC PyInit_avxmath(void) { PyObject *m; m = PyModule_Create(&moduledef); if (!m) { return NULL; } register_xsin(m); return m; } #else PyMODINIT_FUNC initavxmath(void) { PyObject *m; m = Py_InitModule("avxmath", AvxmathMethods); if (m == NULL) { return; } register_xsin(m); } #endif
Для сборки модуля я использовал стандартный setup.py
из документации, заменив название модуля и добавив C-файлы библиотеки SLEEF, флаги компилятора и линковщика. Приведенный выше исходный код модуля я сохранил рядом с setup.py
в файле с именем avxmath.c
, директорию simd
из исходного кода SLEEF переименовал в sleef
и также положил рядом с setup.py
.
def configuration(parent_package='', top_path=None): import numpy from numpy.distutils.misc_util import Configuration config = Configuration('', parent_package, top_path) config.add_extension('avxmath', ['avxmath.c', 'sleef/sleefsimddp.c', 'sleef/sleefsimdsp.c'], extra_compile_args=['-O3', '-Wall', '-Wno-unused', '-Wno-attributes', '-DENABLE_AVX','-mavx'], extra_link_args=['-lm']) return config if __name__ == "__main__": from numpy.distutils.core import setup setup(configuration=configuration)
Для компиляции без установки в систему нужно выполнить команду python setup.py build_ext --inplace
, результатом успешного выполнения которой должен стать готовый модуль в файле avxmath.so
. Теперь можно проверить работоспособность нашей функции. Запускаем python в той же директории, где находится файл avxmath.so
, и проверяем:
>>> from numpy import array, pi >>> import avxmath >>> avxmath.sin(0) 0.0 >>> avxmath.sin(pi) 1.2246467991473532e-16 >>> avxmath.sin([0, pi/2, pi, 3*pi/2, 2*pi]) array([ 0.00000000e+00, 1.00000000e+00, 1.22464680e-16, -1.00000000e+00, -2.44929360e-16]) >>>
Убедившись, что модуль avxmath
импортируется и работает без ошибок, можно сделать небольшой тест производительности и точности новой функции.
import numpy import avxmath import time from numpy import random, pi COUNT=10 x = 2e4*random.random(40000000) - 1e4 t = time.clock() for i in xrange(COUNT): y1 = numpy.sin(x) duration1 = time.clock() - t print "numpy.sin %f sec" % duration1 t = time.clock() for i in xrange(COUNT): y2 = avxmath.sin(x) duration2 = time.clock() - t print "avxmath.sin %f sec" % duration2 delta = y2 - y1 rdelta = delta/y1 print "max absolute difference is %lg, relative %lg" % ( delta[abs(delta).argmax()], rdelta[abs(rdelta).argmax()]) print "speedup is %lg" % (duration1/duration2)
numpy.sin 15.510000 sec avxmath.sin 2.260000 sec max absolute difference is 2.22045e-16, relative 2.63873e-16 speedup is 6.86283
Итак, мы получили ускорение более чем в 6 раз при точности вычислений не хуже 3·10-16! Заменив вызовы функции xsin
на простое копирование памяти, нетрудно убедиться, что ускорение в 10 раз не получилось из-за того, что около 1 секунды из полученных нами 2.26 секунд выполнения ушло на накладные расходы. Аналогично, заменив функцию xsin
на обычный синус из math.h
, мы обнаружим, что времена вычислений с помощью avxmath.sin
и numpy.sin
в нашем тесте с высокой точностью совпадут.
Таким образом, с помощью SIMD инструкций можно добиться значительного ускорения выполняемых с помощью numpy и scipy вычислений, просто заменив импорты обычных функций на оптимизированные. Ну и конечно исходные коды несколько расширенного по сравнению с данной статьей модуля avxmath
доступны на Github по ссылке.
ссылка на оригинал статьи http://habrahabr.ru/post/198916/
Добавить комментарий