Использование алгоритмов без блокировки всегда было чем-то пугающим для разработчика. Очень трудно представить себе организацию доступа к данным без блокировки, таким образом, чтобы два или более потока не могли одновременно обрабатывать один и тот же блок данных. Большинство разработчиков используют стандартные контейнеры типа стеков или связных списков без блокировки, но не более того. В этой же статье я хотел бы рассказать, как организовать доступ к данным в многопоточной среде без блокировки.
Основная идея такого метода заключается в том, что каждый поток использует отдельный буфер, в который копирует данные из основного буфера, обрабатывает их и затем меняет местами указатель на свой буфер с указателем на основной буфер. Рассмотрим следующий код:
#include <stdio.h> struct data_s { int a; int b; }; struct data_s reader_tmp, writer_tmp, data; struct data_s *reader_tmp_ptr, *writer_tmp_ptr, *data_ptr; int done = 0; void process(struct data_s *data) { data->a++; data->b++; } void* writer(void* p) { struct data_s *tmp_ptr; int i; for(i = 0; i < 1000000; i++) { do { tmp_ptr = data_ptr; writer_tmp_ptr->a = tmp_ptr->a; writer_tmp_ptr->b = tmp_ptr->b; process(writer_tmp_ptr); } while(!__sync_compare_and_swap(&data_ptr, tmp_ptr, writer_tmp_ptr)); writer_tmp_ptr = tmp_ptr; } } void* reader(void *p) { struct data_s *tmp_ptr; int a, b; while(!done) { do { tmp_ptr = data_ptr; reader_ptr->a = tmp_ptr->a; reader_ptr->b = tmp_ptr->b; a = tmp_ptr->a; b = tmp_ptr->b; } while(!__sync_bool_compare_and_swap(&data_ptr, tmp_ptr, reader_tmp_ptr)); reader_tmp_ptr = tmp_ptr; printf(“data = {%d, %d}\n”, a, b); } } int main() { pthread_t reader_thread, writer_thread; data.a = 0; data.b = 0; data_ptr = &data; writer_tmp_ptr = &writer_tmp; reader_tmp_ptr = &reader_tmp; pthread_create(&read_thread, NULL, reader, NULL); pthread_create(&write_thread, NULL, writer, NULL); pthread_join(write_thread, NULL); done = 1; pthread_join(read_thread, NULL); return 0; }
В приведённом коде данные перед обработкой копируются из буфера, на который указывает data_ptr в буфер, на который указывает writer_tmp_ptr. А затем эти указатели меняются местами. Причём в data_ptr writer_tmp_ptr записывается с использованием атомарной операции compare_and_swap, которая сравнивает первый аргумент со вторым и если они совпадают, то записывает третий аргумент в первый и возвращает true. Иначе, возвращает false. Для чего это нужно? Рассмотрим на примере функции reader. Допустим поток, выполняющий эту функцию приостановился после строчки a = tmp_ptr->a; В этот момент, tmp_ptr указывает на data. Тут же начал работать поток, выполняющий функцию writer. Выполнив первую итерацию он поменял местами writer_tmp_ptr и data_ptr и начал следующую итерацию, остановившись после строчки data->b++; В данной ситуации writer_tmp_ptr указывает на data и tmp_ptr в функции reader указывает на data. Получается одновременное чтение и модификация одного и того же буфера. Но так как указатели data_ptr и tmp_ptr уже не совпадают, то операция compare_and_swap обнаружит коллизию и выполнит операцию чтения ещё раз. Почему же присваивание reader_tmp_ptr = tmp_ptr не проходит такую проверку?
Всё просто. Переменная reader_tmp_ptr является специфичной переменной для потока, в котором она выполняется. В данном примере я сделал её глобальной, что не совсем правильно, т.к. в случае, с несколькими читающими потоками, пришлось бы заводить ещё одну глобальную переменную для второго потока, и внутри функции определять, какой поток в данный момент выполняется, чтоб использовать ту или иную переменную в качестве уникального для потока указателя на буфер. Оптимальный вариант — это использование т.н. специфичные для потока переменные. Например, библиотека pthread имеет такие замечательные функции, как pthread_getspecific/pthread_setspecific. Целью же написания этого кода было наглядно показать читателю, как работает данный алгоритм. Без оптимизаций, которые могут только запутать представление о самой сути.
Казалось бы, всё идеально, программа должна выводить на экран пары одинаковых значений, но не так всё просто. Представим также, что поток, выполняющий функцию reader, остановился после строчки a = tmp_ptr->a; после чего, поток, выполняющий функцию writer, завершил 2 итерации и выполняет третью. Остановившись после завершения функции process. Далее поток, выполняющий функцию reader возобновляет свою работу. В этой ситуации значения переменных a и b не совпадут, но операция compare_and_swap вернёт true, т.к. data_ptr снова указвает на data, другими словами data_ptr и tmp_ptr снова совпадают. Это называется проблема ABA. Одним из способов решения этой проблемы, является добавление к указателю счётчика, который увеличивается каждый раз, когда ему присваивается новое значение. В следующем примере такая проблема отсутствует.
include <stdio.h> struct data_s { int a; int b; }; struct data_pointer_s { union { uint64_t qw[2]; struct { struct data_s *data_ptr; uint64_t aba_counter; }; }; }; static inline char cas128bit(volatile struct data_pointer_s *a, struct data_pointer_s b, struct data_pointer_s_struct c) { char result; __asm__ __volatile__( "lock cmpxchg16b %1\n\t" "setz %0\n" : "=q" (result) , "+m" (a->qw) : "a" (b.data_ptr), "d" (b.aba_counter) , "b" (c.data_ptr), "c" (c.aba_counter) : "cc" ); return result; } struct data_s reader_tmp, writer_tmp, data; struct data_pointer_s reader_tmp_ptr, writer_tmp_ptr, data_ptr; int done = 0; void process(struct data_s *data) { data->a++; data->b++; } void* writer(void* p) { struct data_pointer_s tmp_ptr; int i; for(i = 0; i < 1000000; i++) { do { tmp_ptr = data_ptr; writer_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a; writer_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b; process(writer_tmp_ptr.data_ptr); writer_tmp_ptr.counter = tmp_ptr.counter + 1; } while(!cas128bit(&data_ptr, tmp_ptr, writer_tmp_ptr)); writer_tmp_ptr = tmp_ptr; } } void* reader(void *p) { struct data_pointer_s tmp_ptr; int a, b; while(!done) { do { tmp_ptr = data_ptr; reader_ptr.data_ptr->a = tmp_ptr.data_ptr->a; reader_ptr.data_ptr->b = tmp_ptr.data_ptr->b; a = tmp_ptr.data_ptr->a; b = tmp_ptr.data_ptr->b; reader_ptr.counter = tmp_ptr.counter + 1; } while(!cas128bit(&data_ptr, tmp_ptr, reader_ptr)); reader_tmp_ptr = tmp_ptr; printf(“data = {%d, %d}\n”, a, b); } } int main() { pthread_t reader_thread, writer_thread; data.a = 0; data.b = 0; data_ptr.data_ptr = &data; data_ptr.counter = 0; writer_tmp_ptr.data_ptr = &writer_tmp; writer_tmp_ptr.counter = 0 reader_tmp_ptr.data_ptr = &reader_tmp; reader_tmp_ptr.counter = 0 pthread_create(&read_thread, NULL, reader, NULL); pthread_create(&write_thread, NULL, writer, NULL); pthread_join(write_thread, NULL); done = 1; pthread_join(read_thread, NULL); return 0; }
Следует отметить, что эффективность данного кода зависит от объёма копируемых данных и от сложности функции process. Если требуется атомарная обработка блоков данных, объёмами в несколько десятков мегабайт, то использование мьютексов было бы намного эффективнее. Также неплохо было бы рассмотреть возможность добавления небольшой задержки(порядка нескольких микросекунд) каждый раз после того как compare_and_swap возвращает false, чтоб дать другому потоку возможность закончить операцию. Опять-таки, наличие задержки, и время напрямую будут зависить от специфики выполняемой задачи.
Отдельно хотелось бы выразить благодарность пользователю vladvic за помощь в понимании и представлении того, как действует данный алгорим.
ссылка на оригинал статьи http://habrahabr.ru/post/271583/
Добавить комментарий