Не секрет, что иногда выделение памяти требует отдельных решений. Например — когда память выделяется и освобождается стремительным
В результате стандартный консервативный аллокатор выстраивает все запросы в очередь на pthread_mutex / critical section. И наш многоядерный процессор медленно и печально едет на первой передаче.
И что с этим делать? Познакомимся поближе с деталями реализации метода Scalable Lock-Free Dynamic Memory Allocation. Maged M. Michael. IBM Thomas J. Watson Research Center.
Самый простой код что я сумел найти — написан под LGPL камрадами Scott Schneider и Christos Antonopoulos. Его и рассмотрим.
Начнем издалека
Итак — как нам избавиться от ненужных локов?
Ответ лежит на поверхности — надо выделять память в lock-free списках. Хорошая идея — но как строятся такие списки?
На помощь нам спешат атомарные операции. Те самые, которые InterlockedCompareExchange. Но постойте — максимум на что мы можем рассчитывать это long long, он же __int64. И что делать? А вот что — определим свой собственный указатель с тегом.
Ограничив размер адреса в 46 бит, и мы можем в 64бит целое спрятать нужные нам дополнения, а они — понадобятся далее.
#pragma pack(1) typedef struct { volatile unsigned __int64 top:46, ocount:18; } top_aba_t;
Кстати, с учетом выравнивания по 8 / 16 байт — мы можем получить не 2 в 46 степени, а несколько больше. Стандартный прием — выданный адрес не может быть нечетным и более того — должен быть выровнен для плавающей точки.
И еще момент — код становится очень длинным. То есть стандартная портянка
desc->Next = queue_head; queue_head = desc;
превращается вот в такие макароны
descriptor_queue old_queue, new_queue; do { old_queue = queue_head; desc->Next = (descriptor*)old_queue.DescAvail; new_queue.DescAvail = (unsigned __int64)desc; new_queue.tag = old_queue.tag + 1; } while (!compare_and_swap64k(queue_head, old_queue, new_queue));
что сильно удлиняет код и делает его менее читабельным. Поэтому очевидные вещи убраны под спойлеры.
Lock-free FIFO queue
Раз у нас теперь есть собственный указатель, можем заняться списком.
// Pseudostructure for lock-free list elements. // The only requirement is that the 5th-8th byte of // each element should be available to be used as // the pointer for the implementation of a singly-linked // list. struct queue_elem_t { char *_dummy; volatile queue_elem_t *next; };
И в нашем случае — не забыть про выравнивание, например так
typedef struct { unsigned __int64 _pad0[8]; top_aba_t both; unsigned __int64 _pad1[8]; } lf_fifo_queue_t;
Оборачиваем работу с атомарными функциями
Теперь определим парочку абстракций, чтобы наш код мог быть переносим (для Win32 например это реализуется вот так):
#define fetch_and_store(address, value) InterlockedExchange((PLONG)(address), (value)) #define atmc_add(address, value) InterlockedExchangeAdd((PLONG)(address), (value)) #define compare_and_swap32(address, old_value, new_value) \ (InterlockedCompareExchange(\ (PLONG)(address), (new_value), (old_value))\ == (old_value)) #define compare_and_swap64(address, old_value, new_value) \ (InterlockedCompareExchange64(\ (PLONGLONG)(address), (__int64)(new_value), (__int64)(old_value)) \ == (__int64)(old_value)) #define compare_and_swap_ptr(address, old_value, new_value) \ (InterlockedCompareExchangePointer((address), \ (new_value), (old_value)) \ == (old_value))
и добавим еще один метод, просто чтобы не отвлекаться на кастинг параметров к __int664 и передаче их по указателям.
#define compare_and_swap64k(a,b,c) \ compare_and_swap64((volatile unsigned __int64*)&(a), \ *((unsigned __int64*)&(b)), \ *((unsigned __int64*)&(c)))
И вот мы уже готовы реализовать базовые функции (добавить и удалить).
Определяем базовые функции работы со списком
static inline int lf_fifo_enqueue(lf_fifo_queue_t *queue, void *element) { top_aba_t old_top; top_aba_t new_top; new_top.ocount = 0; for(;;) { old_top.ocount = queue->both.ocount; old_top.top = queue->both.top; ((queue_elem_t *)element)->next = (queue_elem_t *)old_top.top; new_top.top = (unsigned __int64)element; new_top.ocount += 1; if (compare_and_swap64k(queue->both, old_top, new_top)) return 0; } }
На что тут надо обратить внимание — обычная операция по добавлению обернута циклом, выход из которого — мы успешно записали новое значение поверх старого, и при этом старое кто-то еще не поменял. Ну а если поменял — то повторяем все заново. И еще момент — в наш ocount записываем номер попытки, с которой нам это удалось. Пустячок, а каждая попытка выдает уникальное 64бит целое.
Именно на таком простом шаманстве и строятся lock-free структуры данных.
Аналогичным образом реализуется снятие с вершины нашего FIFO списка:
static inline void *lf_fifo_dequeue(lf_fifo_queue_t *queue) { top_aba_t head; top_aba_t next; next.ocount = 0; for(;;) { head.top = queue->both.top; head.ocount = queue->both.ocount; if (head.top == 0) return NULL; next.top = (unsigned __int64)(((struct queue_elem_t *)head.top)->next); next.ocount += 1; if (compare_and_swap64k(queue->both, head, next)) return ((void *)head.top); } }
Тут мы видим ровно то же самое — в цикле пробуем снять, если есть что и мы снимаем так что старое значение все еще правильное — то отлично, а нет — пробуем еще.
И конечно же инициализация такого элемента списка банальна — вот она:
static inline void lf_fifo_queue_init(lf_fifo_queue_t *queue) { queue->both.top = 0; queue->both.ocount = 0; }
Собственно идея аллокатора
Перейдем теперь непосредственно к аллокатору. Аллокатор должен быть быстрым — поэтому разделим память на классы. Большой (напрямую берем у системы), и маленький, побитый на много-много маленьких подклассов размером от 8 байт до 2 килобайт.
Такой ход конем позволит нам решить две задачи. Первая — маленькие кусочки будут выделяться супербыстро, и за счет того что они разделены в таблицу — не будут лежать в одном огромном списке. А большие куски памяти не будут мешаться под ногами и приводить к проблемам с фрагментацией. Более того — поскольку в каждом подклассе у нас все блоки одного размера, работа с ними очень упрощается.
И еще момент! Выделение маленьких кусочков привяжем к нитям (а освобождение — нет). Таким образом мы убьем двух зайцев сразу — упростится контроль за выделением, и острова памяти выделенные для треда локально — не будут перемешиваться лишний раз.
Опишем хип и дескриптор блока
Помолясь богине Техно, приступим.
Определим парочку констант
struct Descriptor; typedef struct Descriptor descriptor; struct Procheap; typedef struct Procheap procheap; #define TYPE_SIZE sizeof(void*) #define PTR_SIZE sizeof(void*) #define HEADER_SIZE (TYPE_SIZE + PTR_SIZE) #define LARGE 0 #define SMALL 1 #define PAGESIZE 4096 #define SBSIZE (16 * PAGESIZE) #define DESCSBSIZE (1024 * sizeof(descriptor)) #define ACTIVE 0 #define FULL 1 #define PARTIAL 2 #define EMPTY 3 #define MAXCREDITS 64 // 2^(bits for credits in active) #define GRANULARITY 8
И займемся креативом, определим нужные типы данных. Итак, у нас будет много хипов — каждый в своем классе, да привязанных к текущему треду. В суперблоке будет два указателя — активный список и перераспределенный.
Вы спросите — а что это такое и почему так сложно?
Первое, оно же главное — выделять список поэлементно банально невыгодно. То есть — классический однонаправленный список при миллионах элементов превращается в ад и израиль. На каждый элемент списка надо выделить 8 / 16 байт, чтобы хранить два несчастных указателя на сами данные и на следующий элемент списка.
Выгодно ли это? Очевидно что — нет! И как следует поступить? Правильно, а давайте мы сгруппируем наши описатели списка в группы (stripe) по 500 (к примеру) элементов. И получим список не элементов, но — групп. Экономично, практично, работать с элементами можно как и в классическом варианте. Весь вопрос только в нестандартном выделении памяти.
Более того, все Next в пределах блока просто указывают на соседний элемент массива, и мы можем их явно проинициализировать сразу при выделении страйпа. По факту, последний Next в страйпе будет указывать на следующий страйп, но с точки зрения работы со списками — ничего не меняется.
Легко догадаться, что списки memory block descriptors строятся именно так.
И еще — Active и есть наш активный страйп с заранее выделенными кусочками памяти по эн байт, в котором и ведем выдачу памяти по принципу FIFO. Если есть место в страйпе — берем оттуда. Если нет — ищем уже в классическом списке Partial. Если нет ни там ни там — отлично, выделяем новый страйп.
Второе, такая «полосатость» ведет к некоторому перерасходу памяти, т. к. мы можем выделить страйп под 8 байтные кусочки памяти в виде массива в 64К, а запрошено будет пара кусочков всего. И еще, активные страйпы для каждой нити свои, что еще больше усугубит проблему.
Однако, если у нас действительно идет активная работа с памятью, это даст хороший выигрыш по скорости.
Вот сам хип
struct Procheap { volatile active Active; // initially NULL volatile descriptor* Partial; // initially NULL sizeclass* sc; // pointer to parent sizeclass };
А вот что ему надо:
typedef struct { unsigned __int64 ptr:58, credits:6; } active; /* We need to squeeze this in 64-bits, but conceptually * this is the case: * descriptor* DescAvail; */ typedef struct { unsigned __int64 DescAvail:46, tag:18; } descriptor_queue; /* Superblock descriptor structure. We bumped avail and count * to 24 bits to support larger superblock sizes. */ typedef struct { unsigned __int64 avail:24,count:24, state:2, tag:14; } anchor; typedef struct { lf_fifo_queue_t Partial; // initially empty unsigned int sz; // block size unsigned int sbsize; // superblock size } sizeclass;
и собственно дескриптор — описатель нашего фрагмента.
struct Descriptor { struct queue_elem_t lf_fifo_queue_padding; volatile anchor Anchor; // anchor to superblock exact place descriptor* Next; // next element in list void* sb; // pointer to superblock procheap* heap; // pointer to owner procheap unsigned int sz; // block size unsigned int maxcount; // superblock size / sz };
Как видно — ничего сверхъестественного. Описание хипа и суперблока нам нужны в дескрипторе, т. к. выделять память можно в одном треде, а освобождать — в другом.
Приступим к реализации
Сначала нам надо определить наши локальные переменные — хипы, размеры и прочее. Вот примерно так:
/* This is large and annoying, but it saves us from needing an * initialization routine. */ sizeclass sizeclasses[2048 / GRANULARITY] = { {LF_FIFO_QUEUE_STATIC_INIT, 8, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 16, SBSIZE}, ... {LF_FIFO_QUEUE_STATIC_INIT, 2024, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 2032, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 2040, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 2048, SBSIZE}, }; #define LF_FIFO_QUEUE_STATIC_INIT {{0, 0, 0, 0, 0, 0, 0, 0}, {0, 0}, {0, 0, 0, 0, 0, 0, 0, 0}} __declspec(thread) procheap* heaps[2048 / GRANULARITY]; // = { }; static volatile descriptor_queue queue_head;
Тут мы видим все то, что рассматривали в предыдущем разделе:
__declspec(thread) procheap* heaps[2048 / GRANULARITY]; // = { }; static volatile descriptor_queue queue_head;
Это и есть наши хипы per thread. И — список всех дескрипторов per-process.
Malloc — как это работает
Рассмотрим собственно процесс выделения памяти подробнее. Легко увидеть несколько особенностей, а именно:
- Если запрошенный размер не имеет нашего хипа для малых размеров — просто запрашиваем у системы
- Выделяем память по очереди — сначала из активного списка, затем из списка фрагментов, и наконец — пробуем запросить у системы новый кусок.
- У нас всегда есть память в системе, а если нет — надо только подождать в цикле
Вот такое решение.
void* my_malloc(size_t sz) { procheap *heap; void* addr; // Use sz and thread id to find heap. heap = find_heap(sz); if (!heap) // Large block return alloc_large_block(sz); for(;;) { addr = MallocFromActive(heap); if (addr) return addr; addr = MallocFromPartial(heap); if (addr) return addr; addr = MallocFromNewSB(heap); if (addr) return addr; } }
Углубимся в тему, рассмотрим каждую часть в отдельности. Начнем с начала — с добавления нового куска из системы.
static void* MallocFromNewSB(procheap* heap) { descriptor* desc; void* addr; active newactive, oldactive; *((unsigned __int64*)&oldactive) = 0; desc = DescAlloc(); desc->sb = AllocNewSB(heap->sc->sbsize, SBSIZE); desc->heap = heap; desc->Anchor.avail = 1; desc->sz = heap->sc->sz; desc->maxcount = heap->sc->sbsize / desc->sz; // Organize blocks in a linked list starting with index 0. organize_list(desc->sb, desc->maxcount, desc->sz); *((unsigned __int64*)&newactive) = 0; newactive.ptr = (__int64)desc; newactive.credits = min(desc->maxcount - 1, MAXCREDITS) - 1; desc->Anchor.count = max(((signed long)desc->maxcount - 1 ) - ((signed long)newactive.credits + 1), 0); // max added by Scott desc->Anchor.state = ACTIVE; // memory fence. if (compare_and_swap64k(heap->Active, oldactive, newactive)) { addr = desc->sb; *((char*)addr) = (char)SMALL; addr = (char*) addr + TYPE_SIZE; *((descriptor **)addr) = desc; return (void *)((char*)addr + PTR_SIZE); } else { //Free the superblock desc->sb. munmap(desc->sb, desc->heap->sc->sbsize); DescRetire(desc); return NULL; } }
Никаких чудес — всего лишь создание суперблока, дескриптора, и инициализация пустого списка. И добавление в список активных нового блока. Тут прошу обратить внимание на такой факт, что нет цикла с присваиванием. Если не удалось — значит не удалось. Почему так? Потому что функция и так вызывается из цикла, а если не удалось вставить в список — то это значит что вставил кто-то другой и надо проводить выделение памяти сначала.
Перейдем теперь к выделению блока из списка активных — ведь мы уже научились выделять суперблок и прочее.
static void* MallocFromActive(procheap *heap) { active newactive, oldactive; descriptor* desc; anchor oldanchor, newanchor; void* addr; unsigned __int64 morecredits = 0; unsigned long next = 0; // First step: reserve block do { newactive = oldactive = heap->Active; if (!(*((unsigned __int64*)(&oldactive)))) return NULL; if (oldactive.credits == 0) *((unsigned __int64*)(&newactive)) = 0; else --newactive.credits; } while (!compare_and_swap64k(heap->Active, oldactive, newactive)); // Second step: pop block desc = mask_credits(oldactive); do { // state may be ACTIVE, PARTIAL or FULL newanchor = oldanchor = desc->Anchor; addr = (void *)((unsigned __int64)desc->sb + oldanchor.avail * desc->sz); next = *(unsigned long *)addr; newanchor.avail = next; ++newanchor.tag; if (oldactive.credits == 0) { // state must be ACTIVE if (oldanchor.count == 0) newanchor.state = FULL; else { morecredits = min(oldanchor.count, MAXCREDITS); newanchor.count -= morecredits; } } } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor)); if (oldactive.credits == 0 && oldanchor.count > 0) UpdateActive(heap, desc, morecredits); *((char*)addr) = (char)SMALL; addr = (char*) addr + TYPE_SIZE; *((descriptor**)addr) = desc; return ((void*)((char*)addr + PTR_SIZE)); }
Собствено алгоритм прост, лишь слегка путает нас громоздкая форма записи. На самом же деле — если есть что брать, то берем новый кусок из суперблока, и отмечаем сей факт. Попутно проверяем — а не забрали ли мы последний кусочек, и если да — отмечаем и это.
Есть тут роано одна тонкость, а именно — если вдруг выяснилось, что мы взяли последний кусочек из суперблока, то вслед за нами следующий запрос приведет к добавлению нового суперблока взамен уже использованного. И как только мы это обнаружили — нам негде записать факт того что блок выделен. Поэтому мы заносим только что выделенный кусочек в partial list.
static void UpdateActive(procheap* heap, descriptor* desc, unsigned __int64 morecredits) { active oldactive, newactive; anchor oldanchor, newanchor; *((unsigned __int64*)&oldactive) = 0; newactive.ptr = (__int64)desc; newactive.credits = morecredits - 1; if (compare_and_swap64k(heap->Active, oldactive, newactive)) return; // Someone installed another active sb // Return credits to sb and make it partial do { newanchor = oldanchor = desc->Anchor; newanchor.count += morecredits; newanchor.state = PARTIAL; } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor)); HeapPutPartial(desc); }
Настала пора рассмотреть работу с дескрипторами, прежде чем мы перейдем к заключительной части этого эссе.
Memory Block Descriptors
Для начала, научимся дескриптор создавать. Но где? Вообще-то, если кто забыл — мы как раз пишем выделение памяти. Очевидным и красивым решением было бы использование тех же механизмов, что и для generic allocation, но увы — это будет всем известный анекдот pkunzip.zip. Поэтому принцип тот же — выделяем большой блок содержащий массив дескрипторов, и как только массив переполняется — создаем новвый и объединяем его с предыдущим в список.
static descriptor* DescAlloc() { descriptor_queue old_queue, new_queue; descriptor* desc; for(;;) { old_queue = queue_head; if (old_queue.DescAvail) { new_queue.DescAvail = (unsigned __int64)((descriptor*)old_queue.DescAvail)->Next; new_queue.tag = old_queue.tag + 1; if (compare_and_swap64k(queue_head, old_queue, new_queue)) { desc = (descriptor*)old_queue.DescAvail; break; } } else { desc = AllocNewSB(DESCSBSIZE, sizeof(descriptor)); organize_desc_list((void *)desc, DESCSBSIZE / sizeof(descriptor), sizeof(descriptor)); new_queue.DescAvail = (unsigned long)desc->Next; new_queue.tag = old_queue.tag + 1; if (compare_and_swap64k(queue_head, old_queue, new_queue)) break; munmap((void*)desc, DESCSBSIZE); } } return desc; }
Ну а теперь дело за не менее мощным колдунством — надо еще и научиться дескриптор обратно возвращать. Однако, возвращать мы будем в том же FIFO — потому что возвращать нам понадобится только если мы его взяли по ошибке, а этот факт обнаруживается сразу. Так что дело оказывается гораздо проще
void DescRetire(descriptor* desc) { descriptor_queue old_queue, new_queue; do { old_queue = queue_head; desc->Next = (descriptor*)old_queue.DescAvail; new_queue.DescAvail = (unsigned __int64)desc; new_queue.tag = old_queue.tag + 1; } while (!compare_and_swap64k(queue_head, old_queue, new_queue)); }
Helpers
Приведем также вспомогательные функции по инициализации списков и т. д. Функции настолько самоочевидны, что их даже описывать как-то смысла нет.
static void organize_list(void* start, unsigned long count, unsigned long stride) { char* ptr; unsigned long i; ptr = (char*)start; for (i = 1; i < count - 1; i++) { ptr += stride; *((unsigned long*)ptr) = i + 1; } }
static void organize_desc_list(descriptor* start, unsigned long count, unsigned long stride) { char* ptr; unsigned int i; start->Next = (descriptor*)(start + stride); ptr = (char*)start; for (i = 1; i < count - 1; i++) { ptr += stride; ((descriptor*)ptr)->Next = (descriptor*)((char*)ptr + stride); } ptr += stride; ((descriptor*)ptr)->Next = NULL; }
static descriptor* mask_credits(active oldactive) { return (descriptor*)oldactive.ptr; }
Суперблок просто запрашиваем у системы:
static void* AllocNewSB(size_t size, unsigned long alignement) { return VirtualAlloc(NULL, size, MEM_COMMIT, PAGE_READWRITE); }
Точно так же получаем большие блоки, запросив чуть больше под заголовок блока:
static void* alloc_large_block(size_t sz) { void* addr = VirtualAlloc(NULL, sz + HEADER_SIZE, MEM_COMMIT, PAGE_READWRITE); // If the highest bit of the descriptor is 1, then the object is large // (allocated / freed directly from / to the OS) *((char*)addr) = (char)LARGE; addr = (char*) addr + TYPE_SIZE; *((unsigned long *)addr) = sz + HEADER_SIZE; return (void*)((char*)addr + PTR_SIZE); }
А это — поиск в нашей таблице хипа адаптированного под нужный размер (bkb ноль если размер слишком велик):
static procheap* find_heap(size_t sz) { procheap* heap; // We need to fit both the object and the descriptor in a single block sz += HEADER_SIZE; if (sz > 2048) return NULL; heap = heaps[sz / GRANULARITY]; if (heap == NULL) { heap = VirtualAlloc(NULL, sizeof(procheap), MEM_COMMIT, PAGE_READWRITE); *((unsigned __int64*)&(heap->Active)) = 0; heap->Partial = NULL; heap->sc = &sizeclasses[sz / GRANULARITY]; heaps[sz / GRANULARITY] = heap; } return heap; }
А вот обертки для списков. Они добавлены исключительно для наглядности — кода с макаронами вокруг атомиков у нас более чем достаточно, чтобы в одну функцию yt складывать по надцать циклов вокруг compare and swap
static descriptor* ListGetPartial(sizeclass* sc) { return (descriptor*)lf_fifo_dequeue(&sc->Partial); }
static void ListPutPartial(descriptor* desc) { lf_fifo_enqueue(&desc->heap->sc->Partial, (void*)desc); }
Удаление строится банальнейше — перестройкой списка во временный и обратно:
static void ListRemoveEmptyDesc(sizeclass* sc) { descriptor *desc; lf_fifo_queue_t temp = LF_FIFO_QUEUE_STATIC_INIT; while (desc = (descriptor *)lf_fifo_dequeue(&sc->Partial)) { lf_fifo_enqueue(&temp, (void *)desc); if (desc->sb == NULL) DescRetire(desc); else break; } while (desc = (descriptor *)lf_fifo_dequeue(&temp)) lf_fifo_enqueue(&sc->Partial, (void *)desc); }
И несколько оберток вокруг partial lists
static void RemoveEmptyDesc(procheap* heap, descriptor* desc) { if (compare_and_swap_ptr(&heap->Partial, desc, NULL)) DescRetire(desc); else ListRemoveEmptyDesc(heap->sc); }
static descriptor* HeapGetPartial(procheap* heap) { descriptor* desc; do { desc = *((descriptor**)&heap->Partial); // casts away the volatile if (desc == NULL) return ListGetPartial(heap->sc); } while (!compare_and_swap_ptr(&heap->Partial, desc, NULL)); return desc; }
static void HeapPutPartial(descriptor* desc) { descriptor* prev; do { prev = (descriptor*)desc->heap->Partial; // casts away volatile } while (!compare_and_swap_ptr(&desc->heap->Partial, prev, desc)); if (prev) ListPutPartial(prev); }
Последний рывок — и выделять / освобождать готов!
И наконец мы готовы реализовать выделение памяти не в страйпе, все возможности для этого у нас уже есть.
Алгоритм прост — находим наш список, резервируем в нем место (попутно освобождая пустые блоки), и возвращаем его клиенту.
static void* MallocFromPartial(procheap* heap) { descriptor* desc; anchor oldanchor, newanchor; unsigned __int64 morecredits; void* addr; retry: desc = HeapGetPartial(heap); if (!desc) return NULL; desc->heap = heap; do { // reserve blocks newanchor = oldanchor = desc->Anchor; if (oldanchor.state == EMPTY) DescRetire(desc); goto retry; } // oldanchor state must be PARTIAL // oldanchor count must be > 0 morecredits = min(oldanchor.count - 1, MAXCREDITS); newanchor.count -= morecredits + 1; newanchor.state = morecredits > 0 ? ACTIVE : FULL; } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor)); do { // pop reserved block newanchor = oldanchor = desc->Anchor; addr = (void*)((unsigned __int64)desc->sb + oldanchor.avail * desc->sz); newanchor.avail = *(unsigned long*)addr; ++newanchor.tag; } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor)); if (morecredits > 0) UpdateActive(heap, desc, morecredits); *((char*)addr) = (char)SMALL; addr = (char*) addr + TYPE_SIZE; *((descriptor**)addr) = desc; return ((void *)((char*)addr + PTR_SIZE)); }
Теперь рассмотрим как вернуть нам память обратно в список. В общем — классический алгоритм: по переданному нам указателю восстанавливаем дескриптор, и по якорю запомненному в дескрипторе — попадаем в нужное нам место суперблока и отмечаем нужный кусочек как свободный, а кто до сих дочитал тому пиво. И конечно же пара проверок — не надо ли нам освободить весь суперблок, а то в нем он последний неосвобожденный.
void my_free(void* ptr) { descriptor* desc; void* sb; anchor oldanchor, newanchor; procheap* heap = NULL; if (!ptr) return; // get prefix ptr = (void*)((char*)ptr - HEADER_SIZE); if (*((char*)ptr) == (char)LARGE) { munmap(ptr, *((unsigned long *)((char*)ptr + TYPE_SIZE))); return; } desc = *((descriptor**)((char*)ptr + TYPE_SIZE)); sb = desc->sb; do { newanchor = oldanchor = desc->Anchor; *((unsigned long*)ptr) = oldanchor.avail; newanchor.avail = ((char*)ptr - (char*)sb) / desc->sz; if (oldanchor.state == FULL) newanchor.state = PARTIAL; if (oldanchor.count == desc->maxcount - 1) { heap = desc->heap; // instruction fence. newanchor.state = EMPTY; } else ++newanchor.count; // memory fence. } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor)); if (newanchor.state == EMPTY) { munmap(sb, heap->sc->sbsize); RemoveEmptyDesc(heap, desc); } else if (oldanchor.state == FULL) HeapPutPartial(desc); }
На что надо обратить внимание — освобожденные кусочки попадают в partial list, и было бы неплохо добавить проверку — а не попадает ли наш кусочек на вершину Active страйпа, это бы неплохо повысило эффективность вырожденного случая «выделяем и освобождаем в цикле». Но это уже в качестве домашнего задания.
Выводы
В данной крайне занудной и длинной работе мы рассмотрели как можно построить свой аллокатор на lock-free FIFO lists, узнали что это вообще такое, и освоили немало трюков по работе с атомиками. Надеюсь, умение группировать списки в страйпы поможет не только в деле написания своего менеджера памяти.
Дополнительные материалы
ссылка на оригинал статьи http://habrahabr.ru/post/162187/
Добавить комментарий