Шустрый потокобезопасный менеджер кучи и голый Си

в 15:39, , рубрики: C, c++, Алгоритмы, параллельное программирование, Программирование, программирование; c; c++;, метки:

При планировании любой задачи мы стремимся как можно точнее конкретизировать запросы, определить исходные данные и по возможности избавиться от любой неопределенности, мешающей просчитать конечный результат. Однако при разработке высокоуровневой логики не всегда уделяется внимание таким простым казалось бы вещам, как размещение данных в памяти, менеджмент потоков, обрабатывающих наш функционал, особенности реализации динамических массивов или бинарный интерфейс процедур. Когда написанная программа предельно лаконична и оптимизирована, но при этом работает не так быстро, как хотелось бы, закономерно возникает вопрос: «а что еще можно улучшить?» Насколько можно доверять низкоуровневому инструментарию, написанному профессиональными программистами, безусловно разбирающимися в своем деле, но при этом ни черта не понимающими в тех идеях, что вы хотите реализовать? Фрагментация, зацикленность, прерывания, события, объекты, уведомления, каждое новое знакомство с Си-шными или WinAPI-шными библиотеками подталкивает к очевидной мысли: «зачем такая громоздкая реализация?» Почему нельзя просто сделать менеджер кучи выделяющий память за строгое количество шагов? Использовать real-time статистику при работе с разделяемыми данными, вместо сложной системы семафоров и уведомлений? Наращивать динамические массивы без переразмещения и обращаться к случайной ячейке за одинаковое время после любого количества реформаций? Миссия не кажется невыполнимой. Осталось только попробовать.

Предлагаю широкому вниманию процедурную реализацию менеджера кучи, способного выделять диапазоны памяти заказных размеров. Цикл поиска при этом не превышает нормированных пределов. Работа с памятью состоит всего из двух процедур: memAlloc и memRecycle, выполняющих связанные функции. Потокобезопасность поддерживается с помощью дополнительного инструментария, в свою очередь состоящего еще из нескольких процедур. О распараллеливании немного поподробнее: принцип базируется на ассоциированной блокировке разделяемых данных, схожих с блокировкой шины методами Interlocked, однако без блокировки шины. Весь процесс сводится к созданию позиционных стопоров, являющихся по сути диапазонами памяти, содержащими однобайтовые метки состояний каждого из потоков в доступном пуле. Размер выводится из этого соотношения. Поскольку я использую восьмипотоковый пул (а больше мне не нужно), то позиционные стопоры у меня занимают 8 байт (64 бита). Перед перезаписью разделяемой информации стопор блокируется исполняющим потоком, записывая метку в байт под смещением своего номера в пуле. Другие потоки не будут работать с разделяемыми данными пока стопор не обнулится, выполняя Sleep, либо откладывая задачу, либо считая овечек в цикле, на выбор программиста.

Клоны Interlocked процедур — threadExchange, threadCompareExchange и threadIncrement выполняют те же функции, что и оригиналы. Однако при работе внутри потока из пула не блокируют шину. Вместо этого используется ассемблерная процедура, задача которой сводится к двухступенчатой проверке стопора и установке собственной метки (стоит отметить, что при работе вне пула, в потоке не имеющем номера, все же используется блокировка шины). Ее реализация ниже:

cnsBit PROC ;check and set bit
cmp rdx,0 ;первичная проверка на нумерацию потока
jl @negtv ;отрицательные значения не относятся к нумерованному пулу и обрабатываются через InterlockedExchange

cmp qword ptr [rcx],0 ;проверка метки на нуль
jne @fin ;выход при несоответствии

mov r8,rdx ;подготовка номера потока для смещения
mov rax,8 ;подготовка множителя, биты устанавливаются в начале каждого байта
mul rdx ;умножение для получения итогового смещения

mov byte ptr [rcx+r8],1 ;установка бита по нужному адресу
bts rdx,rax ;установка бита в маске по необходимому смещению

cmp qword ptr [rcx],rdx ;проверка метки по маске
jnz @end ;выход при несоответствии

@scess: ;блок соответствия
mov rax,1 ;собственный бит присутствует в диапазоне, сторонние биты отсутствуют
ret
@end: ;блок несоответствия
mov byte ptr [rcx+r8],0 ;удаление бита из метки
@fin: ;блок безусловного выхода
mov rax,0 ;отсутствует собственный бит, либо присутствуют сторонние биты
ret

@negtv: ;блок для ненумерованных потоков
xor rax,rax ;сравниваемый операнд равен нулю
lock cmpxchg qword ptr[rcx],rdx ;compare exchange
jnz @fin
jmp @scess
cnsBit ENDP

Наверно реализация Interlocked клонов не заслуживает внимания, однако кому-то возможно пригодится:

INT64 threadIncrement(INT64 threadNum, INT64 *counter, INT64 *stopper, INT64 value)	//итерация счетчиков. переданное значение прибавляется к находящемуся в целевом диапазоне, изначальное возвращается.
{
	INT64 ret = 0;
	for (;;) {
		if (cnsBit((INT64)stopper, threadNum) == 1) {
			ret = *counter;
			*counter += value;
			*stopper = 0;
			break;
		}
		falseCounter[threadNum] += 1;
		Sleep(1);
	}
	return ret;
}


INT64 threadExchange(INT64 threadNum, INT64 *pointer, INT64 *stopper, INT64 value)		//замена целевого значения на переданное. изначальное возвращается.
{
	INT64 ret = 0;
	for (;;) {
		if (cnsBit((INT64)stopper, threadNum) == 1) {
			ret = *pointer;
			*pointer = value;
			*stopper = 0;
			break;
		}
		falseCounter[threadNum] += 1;
		Sleep(1);
	}
	return ret;
}


INT64 threadCompareExchange(INT64 threadNum, INT64 *pointer, INT64 *stopper, INT64 value, INT64 compareVal)	//замена целевого значения на переданное при условии соответствия компаранду. изначальное возвращается.
{
	INT64 ret = 0;
	for (;;) {
		if (cnsBit((INT64)stopper, threadNum) == 1) {
			ret = *pointer;
			if (*pointer == compareVal) {
				*pointer = value;
			}
			*stopper = 0;
			break;
		}
		falseCounter[threadNum] += 1;
		Sleep(1);
	}
	return ret;
}

Далее описание собственно процедур работающих с памятью:
memoryAllock выделяет указанный отрезок из общего диапазона для размещения данных. Процедура работает с последовательным диапазоном, а также цепочкой непоследовательных отрезков, выделенных в ходе работы и разбитых на группы размерности. Цепочка не является непрерывной и представляет собой некоторое множество цепочек с персональными точками входа. Также цепочки отрезков для размещения разделены на блоки фиксированной длины (1000 шт) и представляют собой цепочки блоков, содержащих массив указателей на отрезки. Такая распределенная структура позволяет избежать излишнего многократного обращения к разделяемым данным при работе в параллельных потоках, поскольку счетчики, стопоры, публичные указатели и т.д. существуют отдельно в каждом блоке. Процедура позволяет независимо от размера диапазона памяти или цепочки отрезков производить выделение памяти или освобождение за фиксированное количество шагов. При наличии освобожденных ранее отрезков функция прежде всего предварительно проверяет среди них подходящий по размеру.

Указатели на отрезки, расположенные в блоках размерных групп, использутся одноразово. Сами блоки также извлекаются из цепи после полной однократной отработки. Recycle-блоки с указателями представляют собой цепочку из последовательно поступивших по времени указателей на освобожденные отрезки памяти, разгруппированные по размеру. Блоки ограничены по количеству хранимых указателей и в случае переполнения создаются дополнительные блоки, встраивающиеся в цепочку между текущим заполненным и следующим по размеру. Цепочка является динамической и в случае опустошения блока при повторных размещениях, блок вырезается из цепи. Изначально цепь состоит лишь из запаса неразмеченной памяти, и растет по мере возврата отработанных отрезков функцией Recycle.

Служебные данные отрезка: первые 8 байт — размер, последние 24 байта — указатель на начало отрезка, указатель на блок отработки и позицию в блоке. Каждый блок указателей устанавливает ссылку на себя в точку входа, если является первым в своей группе размерности, создавая цепь. При высвобождении всех указателей в текущей группе размерности, точка входа обнуляется. Работа с цепочкой начинается с нужной точки входа и дальнейшие перемещения осуществляются при помощи смежных указателей между блоками. При поиске необходимого отрезка памяти для размещения данных, функция Alloc проверяет точку входа округляя до следующей по размеру группы, таким образом для соответствия запросу будет достаточно первого отрезка в очереди без необходимости циклического поиска.

Описание структур:
отрезок выделяемой памяти
4 * INT64 + n
[-1] — заявленный размер отрезка
[n+1] — указатель на начало отрезка
[n+2] — указатель на блок отработки для сброса при дефрагментации
[n+3] — указатель на собственную позицию в блоке отработки

блок отработки с одноразовыми указателями:
(4 + n) * INT64
[-4] — указатель на следующий блок
[-3] — счетчик для извлекаемых указателей
[-2] — счетчик добавляемых указателей
[-1] — восьмипозиционный стопор
[0-999] — указатели на отрезки памяти одной размерной группы

INT64 pageSize;
SYSTEM_INFO sSysInfo;
INT64 SIZEOF_INT64 = sizeof(INT64);
INT64 pInvalidAddress = 0;

INT64 *baseMemStore = 0;
INT64 *memRecycleStore = 0;			//служебный диапазон указателей на размеченную память
INT64 *memRecyclePoint = 0;			//точки входа в цепочки блоков указателей
INT64 memRecycleBlock = 0;			//указатель на крайний выделенный блок в общем диапазоне
INT64 memClear = 0;				//модификатор для побайтового обнуления выделенного отрезка
INT64 memNoDefrag = 0;				//модификатор для переработки без дефрагментации
INT64 memBlockLimit = 1000;			//максимальный лимит блока для отработанных отрезков памяти
INT64 memFragmentCount = 0;			//количество фрагментированных отрезков памяти
INT64 memGroupLimit = 200;			//лимит групп размерностей (предел цикла поиска перед размещением)
INT64 memStoreSize = 1024 * 256 * 4096;		//предельный размер кучи

void memConstruct() {
	GetSystemInfo(&sSysInfo);
	pageSize = (INT64)sSysInfo.dwPageSize;
	memRecycleStore = (INT64*)VirtualAlloc(0, 4 * 256 * 4096, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE);
	memRecyclePoint = (INT64)memRecycleStore + 16;	//счетчик + стопор
	*memRecycleStore = (INT64)memRecycleStore + 16 + memGroupLimit * SIZEOF_INT64;
	baseMemStore = (INT64*)VirtualAlloc(0, memStoreSize, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE);
	*baseMemStore = (INT64)baseMemStore + 24;	//счетчик выделяемой памяти, стопор и заглушка для функции Recycle
}
INT64 memAlloc(INT64 threadNum, INT64 size, INT64 param)
{
	INT64 workPtr = 0;
	INT64 *chainPtr = 0;
	INT64 tempVal = 0;
	INT32 sizeGroup = 0;

	if (size < 3200)
		//проверка нижнего подмножества малых отрезков
		sizeGroup = size / 32;
	//проверка верхнего подмножества больших отрезков
	else sizeGroup = size / (1024 * 1024) + 100;

groupSearch:
	//блок проверки точек входа. при получении опустошенных цепочек процедура возвращает управление этому блоку.
	for (int i = sizeGroup + 1; i < memGroupLimit; i++) {
		sizeGroup = i;
		if (memRecyclePoint[i] != 0) {
			chainPtr = memRecyclePoint[i];
			break;
		}
	}

	//настройка блоков указателей
	if ((INT64)chainPtr != 0) {
		//если блок одноразовых указателей достиг предела, он удаляется
		if (chainPtr[-3] >= memBlockLimit) {
			//процедура делает прыжок на следующую позицию в цепи
			threadExchange(threadNum, &memRecyclePoint[sizeGroup], &chainPtr[-1], chainPtr[-4]); 
		//одноразовый блок отработки встраивается в цепочку для повторного использования блоков
		recycle:
		//цепочка имеет одну точку входа и встраивание происходит от начала цепи. ее значение сохраняется для сверки.
			chainPtr[-4] = tempVal = memRecycleBlock;
			//в случае параллельной перезаписи и провала сверки, процедура повторится
			if (threadCompareExchange(threadNum, &memRecycleBlock, &memRecycleStore[1], &chainPtr[-4], tempVal) != tempVal)
				goto recycle;

			//если цепь достигла нуля, новый отрезок выделяется из запаса
			chainPtr = memRecyclePoint[sizeGroup];
			if ((INT64)chainPtr == 0)
				goto getStore;
		}
	chainRead:
		//проверка соответствия числа помещенных в блок указателей и извлеченных
		if (chainPtr[-3] < chainPtr[-2]) {
			workPtr = threadExchange(threadNum, &chainPtr[chainPtr[-3]], &chainPtr[-1], 0);
			//итерация счетчика извлеченных указателей
			threadIncrement(threadNum, &chainPtr[-3], &chainPtr[-1], 1);
			if ((INT64)workPtr == 0)
				goto chainRead;

//фрагментация отрезка памяти
			//несоответствие доступного размера отрезка и заявленного требует дополнительной обработки
			tempVal = *(INT64*)(workPtr - 8);
			//если взятый из отработки отрезок слишком велик, процедура образует из него два отрезка. Ненужный будет возвращен в свою группу размерности.
			if (tempVal - size > 3200) {
				*(INT64*)(workPtr + size + 24) = tempVal - size - 32;
				*(INT64*)(workPtr + tempVal) = (INT64)workPtr + size + 32;
				*(INT64*)(workPtr + tempVal + 8) = *(INT64*)(workPtr + tempVal + 16) = 0;
				//отделенный лишний отрезок уходит на переработку
				memRecycle(threadNum, (INT64)workPtr + size + 32, &memNoDefrag);
			}
			//поскольку размер найденного отрезка может оказаться незначительно больше заявленного, происходит индексирование
			else size = *(INT64*)(workPtr - 8);
		}
		//если рабочие указатели в данный момент отсутствуют, отрезок берется из запаса
		else goto getStore;
	}
	//если группы подходящей размерности отсутствуют, новый отрезок берется из запаса
	else {
	getStore:
		//если просканированы не все регулярные точки входа, процедура возвращается на дополнительную проверку.
		if (sizeGroup < 100)	
			goto groupSearch;
		//при отсутствии иных возможностей отрезок берется из запаса, происходит сдвиг служебных данных ниже нуля
		workPtr = threadIncrement(threadNum, &baseMemStore[0], &baseMemStore[1], size + 32) + 8;
		goto allocation;
	}

	//настройка отрезка
allocation:
	//указатели на блок отработки сбрасываются
	*(INT64*)(workPtr + size + 8) = *(INT64*)(workPtr + size + 16) = 0;
	//указатель на начало отрезка
	*(INT64*)(workPtr + size) = (INT64)workPtr;
	//рабочий размер отрезка
	*(INT64*)(workPtr - 8) = size;

	//побайтовое обнуление рабочей области, при наличии необходимой команды
	if (param == &memClear) {
		for (int i = 0; i < size; i++)
			*(INT8*)(workPtr + i) = 0;
	}

	//возвращается указатель на нулевую точку рабочей области
	return workPtr;	
}
void memRecycle(INT64 threadNum, INT64 ptr, INT64 param)
{
	INT64 workPtr = ptr;
	INT64 tempSize = 0;
	INT64 *chainPtr = 0;
	INT64 *tempPtr = 0;
	INT64 tempVal = 0;
	INT32 sizeGroup = 0;

	//в случае передачи в процедуру предварительно фрагментированного отрезка, дефрагментация пропускается
	if (param == &memNoDefrag)	
		goto recycle;

	//дефрагментация освобожденных ранее отрезков
	for (;;) {
		//сверка указателей на предыдущий отрезок и блок отработки
		if (*(INT64*)(workPtr - 16) == 0 || **(INT64**)(workPtr - 16) != *(INT64*)(workPtr - 32))	
			break;
		tempVal = threadExchange(threadNum, *(INT64*)(workPtr - 16), *(INT64*)(workPtr - 24) - 8, 0);
		//если указатель на отрезок в блоке отработки был обнулен в параллельном потоке, процедура завершает поиск
		if (tempVal == 0)
			break;
		//при объединении отрезков, размер текущего прибавляется к следующему ниже в диапазоне
		tempSize = *(INT64*)(workPtr - 8);
		workPtr = *(INT64*)(workPtr - 32);
		*(INT64*)(workPtr - 8) += (tempSize + 32);
	}
	//итоговый размер обрабатываемого отрезка дополнительно сохраняется после всех преобразований
	tempSize = *(INT64*)(workPtr - 8);
	for (;;) {
		//проверка заполнения следующего блока
		if (*(INT64*)(workPtr + tempSize + 24) == 0)
			break;
		//размер впереди идущего отрезка прибавляются сразу, все дальнейшие вычисления идут с необходимым смещением.
		tempSize += *(INT64*)(workPtr + tempSize + 24) + 32;
		//сверка указателей на следующий отрезок и блок отработки
		if (*(INT64*)(workPtr + tempSize + 16) == 0 || **(INT64**)(workPtr + tempSize + 16) != *(INT64*)(workPtr + tempSize))
			break;
		tempVal = threadExchange(threadNum, *(INT64*)(workPtr + tempSize + 16), *(INT64*)(workPtr + tempSize + 8) - 8, 0);
		//если указатель на отрезок в блоке отработки был обнулен в параллельном потоке, процедура завершает поиск
		if (tempVal == 0)
			break;
		//при объединении заранее проиндексированный размер фиксируется в обрабатываемом отрезке
		*(INT64*)(workPtr - 8) = tempSize;
	}

recycle:
	//настройка блоков указателей
	//итоговый размер обрабатываемого отрезка дополнительно сохраняется после всех преобразований
	tempSize = *(INT64*)(workPtr - 8);
	if (tempSize < 3200)
		//проверка нижнего подмножества малых отрезков
		sizeGroup = tempSize / 32;
	//проверка верхнего подмножества больших отрезков
	else sizeGroup = tempSize / (1024 * 1024) + 100;
	
	chainPtr = tempPtr = memRecyclePoint[sizeGroup];
	//если точка входа указывает на существующую цепочку, цикл делает прыжок к последнему блоку
	if ((INT64)chainPtr != 0) {	
		for (;;) {
			if (chainPtr[-4] != 0)
				chainPtr = tempPtr = chainPtr[-4];
			else break;
		}
	}
	//если точка входа обнулена, создается новый блок указателей
	else {
	createBlock:
		//при наличии функция повторно использует отработанный одноразовый блок, в противном случае берет из запаса
		if (memRecycleBlock != 0)
			tempPtr = threadPointerScroll(threadNum, &memRecycleBlock, &memRecycleStore[1], &memRecycleBlock) + 4 * SIZEOF_INT64;
		else tempPtr = threadIncrement(threadNum, &memRecycleStore[0], &memRecycleStore[1], (memBlockLimit + 4) * SIZEOF_INT64) + 4 * SIZEOF_INT64;
		//служебные данные нового блока отработки сбрасываются
		tempPtr[0] = tempPtr[-1] = tempPtr[-2] = tempPtr[-3] = tempPtr[-4] = 0;
	}

	//итерация счетчика добавления указателей. указатель на отрезок будет добавлен в блок под полученным номером.
	tempVal = threadIncrement(threadNum, &tempPtr[-2], &tempPtr[-1], 1);
	//если блок переполнен, создается новый
	if (tempVal > memBlockLimit)
		goto createBlock;
	//запись указателя на отработанный отрезок памяти в блок под нужным номером
	tempPtr[tempVal] = workPtr;

	//реструктурирование цепочки групп размерности
	//при создании нового блока указателей, необходимо встроить его в цепь
	if ((INT64)chainPtr != 0 && (INT64)tempPtr != (INT64)chainPtr) {
	makeChain:
		chainPtr = threadCompareExchange(threadNum, &chainPtr[-4], &chainPtr[-1], tempPtr, 0);
		//в случае увеличения цепочки параллельным потоком, процедура делает прыжок на следующее звено и повторяет встраивание
		if ((INT64)chainPtr != 0)
			goto makeChain;
	}
	//указатель на новый блок помещается в точку входа, если она обнулена изначально, либо за время процедурных действий
	if (memRecyclePoint[sizeGroup] == 0)
		memRecyclePoint[sizeGroup] = (INT64)tempPtr;

	//финализация
	//после всех преобразований указатель на начало отрезка модифицируется
	*(INT64*)(workPtr + tempSize) = workPtr;
	//указатель на блок отработки и позицию для сброса при дальнейшей дефрагментации
	*(INT64*)(workPtr + tempSize + 8) = (INT64)tempPtr;
	*(INT64*)(workPtr + tempSize + 16) = &tempPtr[tempVal];
}
INT64 memPtrCheck(INT64 *ptr)	//процедура проверки адреса на соответствие диапазону памяти кучи
{
	if ((unsigned __int64)ptr > (unsigned __int64)baseMemStore && (unsigned __int64)ptr <= (unsigned __int64)*baseMemStore)
		return *ptr;
	return &pInvalidAddress;
}

Автор: Tatuin

Источник

Поделиться новостью

* - обязательные к заполнению поля