Задача поиска данных часто встречается в: веб-сервисах, системах управления баз данных (СУБД), гео-поиске и аналитике.
Сначала для простоты объяснения поставим задачу поиска элементов полным проходом по массиву из 10 000 000 элементов (структур), содержащих 5 полей с диапазонами значений: amount_of_money(0-1000000), gender(0-1), age(0-100), code(0-1000000), height(0-300). А в следующих статьях добавим в решение индексный поиск.
Мы будем писать кроссплатформенно под MSVC11(MSVS2012) и GCC 4.7.2, и использовать в них частично реализованный стандарт C++11.
1. Решение на C
Простейшим решением этой задачи на C будет создать структуру битовых полей занимаемую 8 байт (общее правило, в отсутствии директивы #pragma pack(push,1)
, поля не могут пересекать границы размера их базовых типов, в нашем случае unsigned – 32 бита):
/* Fields */ enum T_field_enum { amount_of_money_e, gender_e, age_e, code_e, height_e, last_e }; struct T_cash_account_row { // 1 – double word (32 bits) unsigned code:20; // 0 - 1000000 unsigned gender:1; // 0 - 1 unsigned age:7; // 0 - 100 // 2 – double word (32 bits) unsigned amount_of_money:20; // 0 - 1000000 unsigned height:9; // 0 – 300 };
Выделить память под 10 000 000 таких элементов:
const size_t c_array_size = 10000000; struct T_cash_account_row *const array_ptr = ( struct T_cash_account_row *)calloc(c_array_size, sizeof(struct T_cash_account_row)); if (array_ptr == NULL) { printf ("calloc error\n"); exit(1); }
В цикле заполнить случайными значениями в пределах диапазонов заданных по условию:
/* Generate random data for the one row */ static inline struct T_cash_account_row generate_row() { struct T_cash_account_row cash_account_row; cash_account_row.age = rand() % 100; cash_account_row.amount_of_money = (rand() % 1000)*(rand() % 1000); cash_account_row.code = (rand() % 1000)*(rand() % 1000); cash_account_row.gender = rand() % 2; cash_account_row.height = rand() % 300; return cash_account_row; } /* ----------------------------------------------------------------------- */ /* in int main() { … } */ /* Fill table random data */ for(i = 0; i < c_array_size; ++i) array_ptr[i] = generate_row();
Создать структуру поискового запроса-фильтра, где last_e
– перечисление со значением числа полей в строке (C++03 7.2 Enumeration declarations):
/* Filters */ struct T_range_filters { struct T_cash_account_row begin, end; /* bytes array or bitset from https://gist.github.com/jmbr/667605 */ unsigned char use_filter[last_e]; }; /* ----------------------------------------------------------------------- */
Здесь use_filter[]
используется для указания – фильтровать ли по данному условию-полю или нет.
И производить поиск проверкой по заданным полям проходя по всем элементам массива в цикле:
/* Compare row with filters */ static inline unsigned char test_predicate(struct T_cash_account_row const*const row, struct T_range_filters const*const range_filters) { return (!range_filters->use_filter[amount_of_money_e] || (row->amount_of_money >= range_filters->begin.amount_of_money && row->amount_of_money <= range_filters->end.amount_of_money)) && (!range_filters->use_filter[gender_e] || (row->gender >= range_filters->begin.gender && row->gender <= range_filters->end.gender)) && (!range_filters->use_filter[age_e] || (row->age >= range_filters->begin.age && row->age <= range_filters->end.age)) && (!range_filters->use_filter[code_e] || (row->code >= range_filters->begin.code && row->code <= range_filters->end.code)) && (!range_filters->use_filter[height_e] || (row->height >= range_filters->begin.height && row->height <= range_filters->end.height)); } /* ----------------------------------------------------------------------- */ /* search */ static inline size_t search(struct T_cash_account_row const*const array_ptr, const size_t c_array_size, struct T_cash_account_row *const result_ptr, struct T_range_filters const*const range_filters) { size_t result_size = 0; size_t i; /* loop index */ for(i = 0; i < c_array_size; ++i) { if(test_predicate(array_ptr + i, range_filters)) result_ptr[result_size] = array_ptr[i], ++result_size; } return result_size; }
Ссылка на рабочий код целиком на GitHub.com
Казалось бы, что здесь можно ещё ускорить и оптимизировать при полном проходе без индексов?
- Развернуть цикл, для уменьшения числа сравнений с условием цикла и делать за одну итерацию несколько фильтраций test_predicate? – Это слишком мало по сравнению с от 5 до 15 сравнений нашей строки во встраиваемой функции test_predicate и по сравнению с обращением к RAM.
- Делать prefetching-cache? – Можно и на C, и на C++, но в рамках нашей задачи это мало что даст, т.к. многократный поиск и так закэширует в LLC(L3) сколько сможет, а весь массив в 80 МБ в любом случае не сможет.
- Использовать векторные команды сравнения из SSE: CMPSS, COMISS, UCOMISS? – Можно и в C, и в C++. Но эта оптимизация не переносима на отличные от x86/x64 процессоры такие, как ARM или Power[PC].
- Можно использовать ключи оптимизации компилятора и PDO, на C и C++. PDO – это в любом случае компромисс, т.к. создается оптимизированный код для одного пути исполнения программы в ущерб остальным путям, т.е. при каких-то входных данных будет быстрее, а при каких-то медленнее. Я же покажу как создать оптимизированный код под каждый из возможных путей исполнения, и только в наиболее критичной к скорости части программы.
На этом низкоуровневая (не архитектурная) оптимизация на C заканчивается, и любые из этих оптимизаций применимы на C++.
2. Как выглядит ещё одна оптимизация на C и C++
- Во-первых, приведенное выше на C решение легко компилируется на компиляторе C++ без каких-либо изменений, т.к. в большинстве случаев имеет место обратная совместимость.
Результат в онлайн компиляторе на C: ideone.com
Результат в онлайн компиляторе на C++11: ideone.com
Я закомментировал подсчет random seed/* srand (time(NULL)); */
, чтобы сравнивать результаты разных запусков программы. Но вы можете раскомментировать его и убедиться, что абсолютное время выполнения меняется, но относительное ускорение от моих оптимизаций остается примерно одинаковым.
- Во-вторых, особо ушлые C-разработчики могли предложить ещё одну оптимизацию – создать 2^5 = 32 варианта функций test_predicate/search на каждый вариант числа условий поиска, сохранить указатели на них в массив и выбирать нужный вариант во время исполнения в зависимости от условий поиска. Это заметно сократит число сравнений.
Допустим пришло условие поиска по 2-ум полям из 5-ти: age и code. Тогда вызываем функцию search_12()
:
/* Compare row with filters */ static inline unsigned char test_predicate_12(struct T_cash_account_row const*const __restrict row, struct T_range_filters const*const __restrict range_filters) { return (row->age >= range_filters->begin.age && row->age <= range_filters->end.age) && (row->code >= range_filters->begin.code && row->code <= range_filters->end.code); } /* ----------------------------------------------------------------------- */ /* search */ static inline size_t search_12(struct T_cash_account_row const*const __restrict array_ptr, const size_t c_array_size, struct T_cash_account_row *const __restrict result_ptr, struct T_range_filters const*const __restrict range_filters) { size_t result_size = 0; size_t i; /* loop index */ for(i = 0; i < c_array_size; ++i) { if(test_predicate_12(array_ptr + i, range_filters)) result_ptr[result_size] = array_ptr[i], ++result_size; } return result_size; }
Хотя число условий в этой функции и уменьшилось до 4-ех с 15-ти первоначальных. Реально же в варианте решения на C сравнений исполнялось из 15 только 9, для условия поиска по 2-ум полям – это видно в дизассемблере функции test_predicate
: github.com. Это происходило потому, что после каждого сравнения use_filter[] == false
шел условный переход на сравнения по следующему полю. Т.е. помимо этих 4-ех сравнений там исполнялись ещё только 5 сравнений с use_filter[]
.
Описываемое здесь решение, например, для поиска по 2-ум полям из 5-ти, даст ускорение в 1.3 раза. Неплохо, но есть небольшая проблемка – C-разработчикам придется вручную создать эти 32 функции, нигде в них не ошибиться перебирая все варианты и при любом изменении числа и имен полей менять нужные функции. Ну и если вам понадобиться подобное решение для таблицы с 10 полями, то вам придется создать 1024 функций. В общем просто сказка при доработке кода!
Дополнительно путаница создается при добавлении полей типов с нетривиальным сравнением, как например строка char[]
сравниваемая через strcmp()
. В C++ это решается созданием пользовательского типа с перегруженным оператором сравнения. (Операторы для фундаментальных типов в C++ перегружать нельзя – одним из параметров оператора обязательно должен быть пользовательский класс.)
А задача автоматического создания нужного числа оптимизированных функций на C++ легко решается раскруткой шаблонов (unrolling of template).
Кому-то может показаться, что это вполне можно решить на C и в run-time, и незачем городить 32 – 1024 функций оптимизированных в compile-time. Допустим создать массив указателей на функции числом равным числу условий, в нашем случае 5, и при каждом поиске заполнять этот массив только теми функциями с условиями, которые используются для данного поискового запроса. А в конце добавлять указатель на функцию возвращающую 1 (true). И каждая из таких функций получает указатель на массив функций, такого же типа, как и сама, и индекс следующей вызываемой функции. Разочарую, но в таком случае функции не встроятся (inline), а их вызов ничуть не быстрее сравнений с условными переходами.
Вот рабочий вариант этого run-time решения на C: GitHub.com
Как видим в MSVC скорость упала с 74 мс, до 84мс. А в GCC ещё больше – до 117мс. На C такая оптимизация не возможна, а возможна лишь оптимизация через создание большого числа функций.
3. Решение на C++
Раскрутка шаблонов осуществляется созданием экземпляра (инстанцированием) одного шаблона другим шаблоном, со значением параметра шаблона на единицу меньше, чем у создающего. А для шаблона со значением параметра 0 мы создаем пустую, ничего не делающую специализацию. В результате инстанцируя раскрутку шаблона с параметром N, мы получаем N – экземпляров раскручиваемого шаблона, в каждом из которых вызывается конструктор или inline
оператор вызова следующего экземпляра шаблона. В такой раскрутке могут участвовать, как шаблонные функции, так и шаблонные классы.
Чтобы вынести раскрутку из логики самих шаблонов, мы создадим шаблонный класс раскрутки. Одним параметром он будет принимать число, на которое необходимо раскрутить, а вторым параметром будет принимать шаблон, который нужно раскрутить:
// The templated constructor of unrolling of the class (no return, mandatory call to the constructor, called once) template<unsigned unroll_count, template<unsigned> class T> struct T_unroll_constructor { T_unroll_constructor<unroll_count-1, T> next_unroll; T<unroll_count-1> functor; template<typename T1> inline T_unroll_constructor(T1 & val1) : next_unroll(val1), functor(val1) {} }; // End of unroll template<template<unsigned> class T> struct T_unroll_constructor<0, T> { template<typename T1> inline T_unroll_constructor(T1 &) {} }; // -------------------------------------------------------------------------
Теперь создадим базовый абстрактный класс поиска. Унаследуем от него шаблонный дочерний класс, принимающий параметром шаблона 32-битное значение unsigned int
, каждый бит которого будет означать использовать ли соответствующий фильтр или нет:
// Abstract base class of filters for each search variant (range_filters) struct T_filter { // search virtual size_t search(T_cash_account_row const*const __restrict array_ptr, const size_t c_array_size, T_cash_account_row *const __restrict result_ptr, T_range_filters const*const __restrict range_filters) = 0; }; // ------------------------------------------------------------------------- // The filters for each search variant (range_filters) template<unsigned index_pred> struct T_custom_filter : T_filter { inline unsigned char test_predicate(T_cash_account_row const*const __restrict row, T_range_filters const*const __restrict range_filters) { return (!(index_pred & 1<<amount_of_money_e) || (row->amount_of_money >= range_filters->begin.amount_of_money && row->amount_of_money <= range_filters->end.amount_of_money)) && (!(index_pred & 1<<gender_e) || (row->gender >= range_filters->begin.gender && row->gender <= range_filters->end.gender)) && (!(index_pred & 1<<age_e) || (row->age >= range_filters->begin.age && row->age <= range_filters->end.age)) && (!(index_pred & 1<<code_e) || (row->code >= range_filters->begin.code && row->code <= range_filters->end.code)) && (!(index_pred & 1<<height_e) || (row->height >= range_filters->begin.height && row->height <= range_filters->end.height)); } // ------------------------------------------------------------------------- // search virtual size_t search(T_cash_account_row const*const __restrict array_ptr, const size_t c_array_size, T_cash_account_row *const __restrict result_ptr, T_range_filters const*const __restrict range_filters) final { size_t result_size = 0; size_t i; // loop index for(i = 0; i < c_array_size; ++i) { if(test_predicate(array_ptr + i, range_filters)) result_ptr[result_size] = array_ptr[i], ++result_size; } return result_size; } }; // -------------------------------------------------------------------------
Т.к. параметр шаблона index_pred
и перечисления amount_of_money_e, gender_e …
известны на этапе компиляции, то часть условий компилятор выкинет, как всегда верные. Фактически мы помогает компилятору оптимизировать нашу программу. Это самое важное в данном решение!
А теперь покажем, как развернется этот шаблонный дочерний класс template<unsigned index_pred> struct T_custom_filter
в 32 класса. Создадим 32 объекта каждого из них и сохраним на них указатели базового типа в статичный массив std::array<>
. А во время выполнения будем полиморфно обращаться к нужному объекту, в зависимости от условий поиска:
class T_optimized_search { // unroll tamplates template<unsigned index_pred> struct T_unroll_find { template<typename T> T_unroll_find(T &filters) { filters[index_pred].reset( new T_custom_filter<index_pred>() ); } }; // ------------------------------------------------------------------------- // Get index of T_test_pred version for current search range inline unsigned get_index_pred(T_range_filters const*const __restrict range_filters) { unsigned result = 0; for(size_t i = 0; i < last_e; ++i) result |= range_filters->use_filter[i]?(1<<i):0; return result; } std::array<std::unique_ptr<T_filter>, 1<<last_e> filters; T_unroll_constructor< 1<<last_e, T_unroll_find> fill_filter; public: T_optimized_search() : fill_filter(filters) {} // C++ optimized search inline size_t search(T_cash_account_row const*const __restrict array_ptr, const size_t c_array_size, T_cash_account_row *const __restrict result_ptr, T_range_filters const*const __restrict range_filters) { auto const& filter = filters[get_index_pred(range_filters)]; return filter->search(array_ptr, c_array_size, result_ptr, range_filters); } }; // -------------------------------------------------------------------------
Здесь функция unsigned get_index_pred((T_range_filters const*const __restrict range_filters)
возвращает индексный номер необходимого поискового объекта для данных условия поиска range_filters
.
Используется аналогичным образом, как и решение на C:
T_optimized_search optimized_search; // C++ optimized search result_size = optimized_search.search(array_ptr, c_array_size, result_ptr, &range_filters);
Вот сравнение дизассемблерного кода двух функций test_predicate
на C и оптимизированной на C++, скомпилированных на MSVC11(MSVS 2012), с моими комментариями – наглядно видна разница если смотреть через TortoiseDiff: ссылка diff на GitHub.com
Мы видим, что от 15 сравнений, 9 из которых при наших поисковых условиях исполняются, остались только 4 сравнения – ассемблерные команды cmp.
«Картинка disasm из TortoiseDiff c моими комментариями»
Фактически, с помощью шаблонов мы вынесли из цикла во вне проверку на использование каждого из фильтров. А внутри цикла получили значения использования фильтров use_filter[]
известные в compile-time, что позволило компилятору исключить их во время оптимизации. Т.е. данная оптимизация применима ко всем подобным случаям выноса вычислений или проверок из цикла во вне.
В примере на C++ я использовал C-style способ передачи параметров в функцию по константному указателю
*const
, чтобы в diff между C и C++ видеть изменения касающиеся только обсуждаемой оптимизации. Однако, используя интерфейсы в C++-style, функция может принимать параметры и по ссылке &, что исключит возможность забытьconst
после * и это несколько короче. Но Google C++ Style Guide рекомендует передавать неизменяемые параметры по константной ссылкеconst&
, а изменяемые по константному указателю*const
. Если код написан в таком стиле, то вы полностью контролируете изменение (или не изменение) ваших переменных передаваемых в чужую функцию – т.е. если передаете по значениюvoid func(int const& a, int *b) {} // function of other developer in Google C++ Style int* a = new int(1); int b = 2; func(*a, b);
то компилятор выдаст ошибку о том, что функция хочет изменить ваш параметр b. Это особенно важно при разработке через тестирование TDD, когда внешние вызовы тестов жестко задают формат интерфейсов, и в данном случае такой вызов во внешних тестах сказал бы разработчику функции о том, что b – изменять нельзя.
А если передаем по указателю (или со взятием адреса):void func(int const& a, int *b) {} // function of other developer in Google C++ Style int* a = new int(1); int b = 2; func(*a, &b);
то скомпилируется без ошибок. И нам даже из вызова функции очевидно, что функцией переменная
a
не изменяется, а переменнаяb
изменяется. А в случае применения TDD мы говорим о том, что разработчик должен приниматьb
по указателю, а значит и должен изменять её. Иa
он должен будет принять по константной ссылке или значению, и не сможет менять её внешнее значение.
Но в C, где нет ссылок, такой подход не возможен, т.к. если функция всегда принимает только по указателю, то на стороне вызывающего нельзя гарантировать невозможность их модификации, а передача по значению переменных пользовательского типа может иметь значительные накладные расходы.
4. Заключение
Вот полностью рабочий вариант этого решения на C++: GitHub.com
У меня на GCC4.7.2 с ключами –O3 –march=native, CPUCore i5 K750 и размером exe-файла в 74КБ результат такой:
Generated rows: 10000000
C++-Searching…
C++-optimized search took 0.061000 seconds.
Found rows: 38
C-Searching…
C-search took 0.089000 seconds.
The C++ faster than C: 1.459016 times
Found rows: 38
А на MSVC11(MSVS2012) с ключами /O2 /Ob2 /Oi, CPU Core i5 K750 и размером exe-файла в 138КБ получился результат:
Generated rows: 10000000
C++-Searching…
C++-optimized search took 0.056000 seconds.
Found rows: 38
C-Searching…
C-search took 0.074000 seconds.
The C++ faster than C: 1.321429 times
Found rows: 38
Как мы видим время исполнения упало с 74мс, до 56мс, т.е. скорость выросла в 1.3 раза. В принципе не плохо.
Всего в 1.3 раза? А как насчет ускорения в 3.5 – 5.3 раза для поиска полным проходом, есть идеи?
Вывод – чем больше компилятору известно во время компиляции, тем лучше он сможет оптимизировать программу. А в этом ему как ничто другое помогают шаблоны (templates).
К слову, эта оптимизация не применима в Java и C#, т.к. в generics невозможно использовать параметром значение, а не тип.
В следующей статье хардкорное решение с ускорением в 3.5 – 5.3 и все ещё без индексов. Но использоваться решение будет далее и при индексном поиске.
ссылка на оригинал статьи http://habrahabr.ru/post/182356/
Добавить комментарий