Атомарная обработка блоков данных без блокировки

от автора

Использование алгоритмов без блокировки всегда было чем-то пугающим для разработчика. Очень трудно представить себе организацию доступа к данным без блокировки, таким образом, чтобы два или более потока не могли одновременно обрабатывать один и тот же блок данных. Большинство разработчиков используют стандартные контейнеры типа стеков или связных списков без блокировки, но не более того. В этой же статье я хотел бы рассказать, как организовать доступ к данным в многопоточной среде без блокировки.

Основная идея такого метода заключается в том, что каждый поток использует отдельный буфер, в который копирует данные из основного буфера, обрабатывает их и затем меняет местами указатель на свой буфер с указателем на основной буфер. Рассмотрим следующий код:

#include <stdio.h>  struct data_s {   int a;   int b; }; struct data_s reader_tmp, writer_tmp, data; struct data_s *reader_tmp_ptr, *writer_tmp_ptr, *data_ptr; int done = 0;  void process(struct data_s *data) {   data->a++;   data->b++; } void* writer(void* p) {   struct data_s *tmp_ptr;   int i;   for(i = 0; i < 1000000; i++) {     do {       tmp_ptr = data_ptr;       writer_tmp_ptr->a = tmp_ptr->a;       writer_tmp_ptr->b = tmp_ptr->b;       process(writer_tmp_ptr);     } while(!__sync_compare_and_swap(&data_ptr, tmp_ptr, writer_tmp_ptr));     writer_tmp_ptr = tmp_ptr;   } }  void* reader(void *p) {   struct data_s *tmp_ptr;   int a, b;   while(!done) {     do {       tmp_ptr = data_ptr;       reader_ptr->a = tmp_ptr->a;       reader_ptr->b = tmp_ptr->b;       a = tmp_ptr->a;       b = tmp_ptr->b;     } while(!__sync_bool_compare_and_swap(&data_ptr, tmp_ptr, reader_tmp_ptr));     reader_tmp_ptr = tmp_ptr;     printf(“data = {%d, %d}\n”, a, b);   } }  int main() {   pthread_t reader_thread, writer_thread;   data.a = 0;   data.b = 0;   data_ptr = &data;   writer_tmp_ptr = &writer_tmp;   reader_tmp_ptr = &reader_tmp;   pthread_create(&read_thread, NULL, reader, NULL);   pthread_create(&write_thread, NULL, writer, NULL);   pthread_join(write_thread, NULL);   done = 1;   pthread_join(read_thread, NULL);   return 0; } 

В приведённом коде данные перед обработкой копируются из буфера, на который указывает data_ptr в буфер, на который указывает writer_tmp_ptr. А затем эти указатели меняются местами. Причём в data_ptr writer_tmp_ptr записывается с использованием атомарной операции compare_and_swap, которая сравнивает первый аргумент со вторым и если они совпадают, то записывает третий аргумент в первый и возвращает true. Иначе, возвращает false. Для чего это нужно? Рассмотрим на примере функции reader. Допустим поток, выполняющий эту функцию приостановился после строчки a = tmp_ptr->a; В этот момент, tmp_ptr указывает на data. Тут же начал работать поток, выполняющий функцию writer. Выполнив первую итерацию он поменял местами writer_tmp_ptr и data_ptr и начал следующую итерацию, остановившись после строчки data->b++; В данной ситуации writer_tmp_ptr указывает на data и tmp_ptr в функции reader указывает на data. Получается одновременное чтение и модификация одного и того же буфера. Но так как указатели data_ptr и tmp_ptr уже не совпадают, то операция compare_and_swap обнаружит коллизию и выполнит операцию чтения ещё раз. Почему же присваивание reader_tmp_ptr = tmp_ptr не проходит такую проверку?

Всё просто. Переменная reader_tmp_ptr является специфичной переменной для потока, в котором она выполняется. В данном примере я сделал её глобальной, что не совсем правильно, т.к. в случае, с несколькими читающими потоками, пришлось бы заводить ещё одну глобальную переменную для второго потока, и внутри функции определять, какой поток в данный момент выполняется, чтоб использовать ту или иную переменную в качестве уникального для потока указателя на буфер. Оптимальный вариант — это использование т.н. специфичные для потока переменные. Например, библиотека pthread имеет такие замечательные функции, как pthread_getspecific/pthread_setspecific. Целью же написания этого кода было наглядно показать читателю, как работает данный алгоритм. Без оптимизаций, которые могут только запутать представление о самой сути.

Казалось бы, всё идеально, программа должна выводить на экран пары одинаковых значений, но не так всё просто. Представим также, что поток, выполняющий функцию reader, остановился после строчки a = tmp_ptr->a; после чего, поток, выполняющий функцию writer, завершил 2 итерации и выполняет третью. Остановившись после завершения функции process. Далее поток, выполняющий функцию reader возобновляет свою работу. В этой ситуации значения переменных a и b не совпадут, но операция compare_and_swap вернёт true, т.к. data_ptr снова указвает на data, другими словами data_ptr и tmp_ptr снова совпадают. Это называется проблема ABA. Одним из способов решения этой проблемы, является добавление к указателю счётчика, который увеличивается каждый раз, когда ему присваивается новое значение. В следующем примере такая проблема отсутствует.

include <stdio.h>  struct data_s {   int a;   int b; };  struct data_pointer_s {   union {     uint64_t qw[2];     struct {       struct data_s *data_ptr;       uint64_t aba_counter;     };   }; };  static inline char cas128bit(volatile struct data_pointer_s *a, struct data_pointer_s b, struct data_pointer_s_struct c) {   char result;   __asm__ __volatile__(     "lock cmpxchg16b %1\n\t"     "setz %0\n"     : "=q" (result)     , "+m" (a->qw)     : "a" (b.data_ptr), "d" (b.aba_counter)     , "b" (c.data_ptr), "c" (c.aba_counter)     : "cc"   );   return result; } struct data_s reader_tmp, writer_tmp, data; struct data_pointer_s reader_tmp_ptr, writer_tmp_ptr, data_ptr; int done = 0;  void process(struct data_s *data) {   data->a++;   data->b++; }  void* writer(void* p) {   struct data_pointer_s tmp_ptr;   int i;   for(i = 0; i < 1000000; i++) {     do {       tmp_ptr = data_ptr;       writer_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a;       writer_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b;       process(writer_tmp_ptr.data_ptr);       writer_tmp_ptr.counter = tmp_ptr.counter + 1;     } while(!cas128bit(&data_ptr, tmp_ptr, writer_tmp_ptr));     writer_tmp_ptr = tmp_ptr;   } }  void* reader(void *p) {   struct data_pointer_s tmp_ptr;   int a, b;   while(!done) {     do {       tmp_ptr = data_ptr;       reader_ptr.data_ptr->a = tmp_ptr.data_ptr->a;       reader_ptr.data_ptr->b = tmp_ptr.data_ptr->b;       a = tmp_ptr.data_ptr->a;       b = tmp_ptr.data_ptr->b;       reader_ptr.counter = tmp_ptr.counter + 1;     } while(!cas128bit(&data_ptr, tmp_ptr, reader_ptr));     reader_tmp_ptr = tmp_ptr;     printf(“data = {%d, %d}\n”, a, b);   } }  int main() {   pthread_t reader_thread, writer_thread;      data.a = 0;   data.b = 0;   data_ptr.data_ptr = &data;   data_ptr.counter = 0;   writer_tmp_ptr.data_ptr = &writer_tmp;   writer_tmp_ptr.counter = 0   reader_tmp_ptr.data_ptr = &reader_tmp;   reader_tmp_ptr.counter = 0      pthread_create(&read_thread, NULL, reader, NULL);   pthread_create(&write_thread, NULL, writer, NULL);    pthread_join(write_thread, NULL);   done = 1;   pthread_join(read_thread, NULL);   return 0; } 

Следует отметить, что эффективность данного кода зависит от объёма копируемых данных и от сложности функции process. Если требуется атомарная обработка блоков данных, объёмами в несколько десятков мегабайт, то использование мьютексов было бы намного эффективнее. Также неплохо было бы рассмотреть возможность добавления небольшой задержки(порядка нескольких микросекунд) каждый раз после того как compare_and_swap возвращает false, чтоб дать другому потоку возможность закончить операцию. Опять-таки, наличие задержки, и время напрямую будут зависить от специфики выполняемой задачи.

Отдельно хотелось бы выразить благодарность пользователю vladvic за помощь в понимании и представлении того, как действует данный алгорим.

ссылка на оригинал статьи http://habrahabr.ru/post/271583/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *