Невероятно быстрый подсчёт байтов

от автора

Оказалось, что тема суммирования целых чисел в кодировке ASCII в Haswell со скоростью memcpy гораздо популярнее, чем я мог ожидать. Именно поэтому я решил поучаствовать и в другом челлендже в жанре HighLoad: подсчёт uint8. В настоящее время я занимаю всего лишь 13 место в списке лидеров, проигрываю первому месту около 7%, но уже узнал немало интересного. В этом посте я полностью опишу моё решение, в том числе, удивительный паттерн считывания из памяти. Используя его, можно примерно до 30% (по сравнению с обычным последовательным доступом) повысить скорость передачи в контексте одноядерных рабочих нагрузок, ограниченных размером кэша. По-видимому, этот метод малоизвестен.

Как и в других постах автора, программа настроена для следующих входных характеристик высоконагруженной системы: Intel Xeon E3-1271 v3 @ 3,60 ГГц, ОЗУ 512 МБ, Ubuntu 20.04. В ней используется только AVX2, а AVX512 не используется.

Задача

«Выведите на экран, сколько байт соответствует значению 127 в файле размером 250 МБ, который полон байт, равномерно выбранных из диапазона [0, 255] и отправленных в стандартный вывод»

К следующему просто нечего добавить! Решение, которое мы представим, работает в 550 раз быстрее следующей тривиальной программы.

uint64_t count = 0; for (uint8_t v; std::cin >> v;) {     if (v == 127) {         ++count;     } }  std::cout << count << std::endl; return 0;

Ядро

Весь исходный код решения приведён в конце этого поста. Но сначала я пошагово объясню, как он работает. Ядро состоит всего из трёх инструкций, поэтому сразу перехожу к блоку __asm__ (извините!).

; rax — это основание ввода ; rsi — это смещение до актуального фрагмента vmovntdqa    (%rax, %rsi, 1), %ymm4 ; ymm2 — это вектор, заполненный 127 vpcmpeqb     %ymm4, %ymm2, %ymm4 ; ymm6 — это аккумулятор, байты которого соответствуют  ; текущему счёту экземпляров 127 ; на данной позиции во входном фрагменте  vpsubb       %ymm4, %ymm6, %ymm6

При помощи этого кода мы перебираем 32-байтные фрагменты ввода и:

  • Загружаем фрагмент с vmovntdqa (это инструкция перемещения, записываемая в память, минуя кэш, вставлена только в стилистических целях и во время выполнения роли не играет).

  • Каждый байт во фрагменте сравниваем со 127 при помощи vpcmpeqb, что даёт нам 0xFF (оно же -1), и этот байт соответствует 127, а все остальные — 0x00. Например,[125, 126, 127, 128, …] принимает вид [0, 0, -1, 0, …].

  • Вычитаем результат сравнения из аккумулятора. Продолжая предыдущий пример и предполагая, что аккумулятор у нас заполнен нулями, получаем [0, 0, 1, 0, …].

Затем, чтобы не допустить переполнения этого узкого аккумулятора, мы время от времени дампируем его в более широкий при помощи следующего кода:

; ymm1 — это нулевой вектор ; ymm6 — это узкий аккумулятор vpsadbw      %ymm1,%ymm6,%ymm6 ; ymm3 — это широкий аккумулятор vpaddq       %ymm3,%ymm6,%ymm3

vpsadbw суммирует в аккумуляторе каждые восемь байт, получая из них четыре 64-разрядных числа, после чего vpadddq суммирует результат с более широким аккумулятором, в котором переполнения гарантированно не произойдёт. В конце работы мы извлекаем результат, чтобы получить окончательный счёт.

Пока ничего экстраординарного. На самом деле, именно такой подход описан в следующей дискуссии на StackOverflow: How to count character occurrences using SIMD.

Начинается волшебство

Сложность этой задачи в том, что вычислений в её рамках очень мало, но они очень ограничены по памяти. Я проштудировал мануал по оптимизации от Intel (там полно опечаток) в поисках нужных мне данных о памяти, пока на странице 788 не встретил рассказ о 4 аппаратных префетчерах (механизмах предвыборки инструкций). Создавалось впечатление, как будто три из них полезны только при последовательном доступе (которым я уже занимался), но в одном, который называется «Streamer», нашёлся интересный нюанс:

«Фиксирует и ведёт до 32 потоков операций доступа к данным. Для каждой 4-килобайтной страницы можно вести один прямой и один обратный поток».

«Для каждой 4-килобайтной страницы». Улавливаете суть? Можно не обрабатывать последовательно весь вывод, а перемежать обработку 4-килобайтных страниц, следующих друг за другом. Также мы немного разматываем ядро и обрабатываем в каждом блоке целую кэш-линию (2×32 байт).

#define BLOCK(offset) \     "vmovntdqa    " #offset " * 4096 (%6, %2, 1), %4\n\t" \     "vpcmpeqb     %4, %7, %4\n\t" \     "vmovntdqa    " #offset " * 4096 + 0x20 (%6, %2, 1), %3\n\t" \     "vpcmpeqb     %3, %7, %3\n\t" \     "vpsubb       %4, %0, %0\n\t" \     "vpsubb       %3, %1, %1\n\t" \

8 из них мы помещаем в главный цикл, где offset устанавливается в размере от 0 до 7 включительно.

В таком случае балл на HighLoad увеличивается примерно на 15%, но, если ваше ядро ещё сильнее ограничено по памяти — допустим, вы просто складываете байты при помощи vpaddb, чтобы найти их сумму по модулю 255, на этом можно выиграть до 30%. Впечатляет, учитывая, насколько это простое изменение!

В любом случае, есть ещё одна маленькая деталь: мы добавляем предвыборку четырёх ближайших кэш-линий:

#define BLOCK(offset) \     "vmovntdqa    " #offset " * 4096 (%6, %2, 1), %4\n\t" \     "vpcmpeqb     %4, %7, %4\n\t" \     "vmovntdqa    " #offset " * 4096 + 0x20 (%6, %2, 1), %3\n\t" \     "vpcmpeqb     %3, %7, %3\n\t" \     "vpsubb       %4, %0, %0\n\t" \     "vpsubb       %3, %1, %1\n\t" \     "prefetcht0   " #offset " * 4096 + 4 * 64 (%6, %2, 1)\n\t"

Почему именно 4 кэш-линии? Не могу внятно ответить на этот вопрос, просто так работает лучше. На следующем графике показано, как исполняется программа при таком решении, причём, в графике заложены шаги предвыборки от 0 до 100 на иной системе (вот почему оптимум здесь сдвинут). Как видите, кривая довольно сложна.

Вверху: минимальное время выполнения в сравнении с шагом предвыборки. Внизу: шаг предвыборки

Вверху: минимальное время выполнения в сравнении с шагом предвыборки. Внизу: шаг предвыборки

Исходный код

#include <iostream> #include <cstdint> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #include <immintrin.h> #include <cassert>  #define BLOCK_COUNT 8 #define PAGE_SIZE 4096 #define TARGET_BYTE 127  #define BLOCKS_8 \     BLOCK(0)  BLOCK(1)  BLOCK(2)  BLOCK(3) \     BLOCK(4)  BLOCK(5)  BLOCK(6)  BLOCK(7)  #define BLOCK(offset) \     "vmovntdqa    " #offset "*4096(%6,%2,1),%4\n\t" \     "vpcmpeqb     %4,%7,%4\n\t" \     "vmovntdqa    " #offset "*4096+0x20(%6,%2,1),%3\n\t" \     "vpcmpeqb     %3,%7,%3\n\t" \     "vpsubb       %4,%0,%0\n\t" \     "vpsubb       %3,%1,%1\n\t" \     "prefetcht0   " #offset "*4096+4*64(%6,%2,1)\n\t"   static inline __m256i hsum_epu8_epu64(__m256i v) {     return _mm256_sad_epu8(v, _mm256_setzero_si256()); }  int main() {     struct stat sb;     assert(fstat(STDIN_FILENO, &sb) != -1);     size_t length = sb.st_size;      char* start = static_cast<char*>(mmap(nullptr, length, PROT_READ, MAP_PRIVATE | MAP_POPULATE, STDIN_FILENO, 0));     assert(start != MAP_FAILED);      uint64_t count = 0;     __m256i sum64 = _mm256_setzero_si256();     size_t offset = 0;      __m256i compare_value = _mm256_set1_epi8(TARGET_BYTE);     __m256i acc1 = _mm256_set1_epi8(0);     __m256i acc2 = _mm256_set1_epi8(0);     __m256i temp1, temp2;      while (offset + BLOCK_COUNT*PAGE_SIZE <= length) {         int batch = PAGE_SIZE / 64;         asm volatile(             ".align 16\n\t"             "0:\n\t"              BLOCKS_8              "add          $0x40, %2\n\t"             "dec          %5\n\t"             "jg           0b"             : "+x" (acc1), "+x" (acc2), "+r" (offset), "+x" (temp1), "+x" (temp2), "+r" (batch)             : "r" (start), "x" (compare_value)             : "cc", "memory"         );          offset += (BLOCK_COUNT - 1)*PAGE_SIZE;          sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc1));         sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc2));          acc1 = _mm256_set1_epi8(0);         acc2 = _mm256_set1_epi8(0);     }      sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc1));     sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(acc2));      count += _mm256_extract_epi64(sum64, 0);     count += _mm256_extract_epi64(sum64, 1);     count += _mm256_extract_epi64(sum64, 2);     count += _mm256_extract_epi64(sum64, 3);      for (; offset < length; ++offset) {         if (start[offset] == TARGET_BYTE) {             ++count;         }     }      std::cout << count << std::endl;     return 0; }

Заключение

Удивительно, насколько обойдён вниманием паттерн с использованием перемежающихся страниц. Насколько помню, никогда не встречал его в реальной практике. Любопытно! Если вам доводилось с ним сталкиваться, расскажите об этом. А если я забыл ещё о каких-то вариантах оптимизации памяти, сообщите о них тоже.


ссылка на оригинал статьи https://habr.com/ru/articles/856334/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *