- PVSM.RU - https://www.pvsm.ru -
ConcurrentQueue можно отнести к lock-free конкурентным структурам данных. В ее реализации нет блокировок (lock, Mutex…) и реализована она с использованием:
— классической функции CompareExchange;
— SpinWait
— volatile (используется как memory-barrier)
В основу ConcurrentQueue заложена структура ring-buffer (кольцевой буфер [1]).
Кольцевой буфер идеально подходит для реализации структуры данных «очередь» (FIFO).
В его основе лежит массив данных и 2 указателя – начало (start) и конец (end).
Предусмотрено две основные операции:
Устройство ConcurrentQueue немного сложнее, чем классический кольцевой буфер. В его реализации используется понятие сегмента (Segment). ConcurrentQueue состоит из связанного списка (однонаправленного) сегментов. Размер сегмента равен 32.
private class Segment {
volatile VolatileBool[] m_state;
volatile T[] m_array;
volatile int m_low;
volatile int m_high;
volatile Segment m_next;
}
Первоначально в ConcurrentQueue создается 1 сегмент
По мере необходимости к нему справа добавляется новые сегменты
В результате получается однонаправленный связанный список. Начало связанного списка задает m_head, конец – m_tail. Ограничения:
Ниже представлен примерный алгоритм добавление элементов в сегмент.
index = Interlocked.Increment(ref this.m_high);
if (index <= 31)
{
m_array[index] = value;
m_state[index].m_value = true;
}
m_state – массив состояния ячеек, если значение true – элемент записан в ячейку, если false — еще нет. По сути, это некий «Commit» записи. Нужен он для того, чтобы между операциями увеличения индекса Interlocked.Increment и записью значения m_array[index] = value не произошло чтение элемента другим потоком. Тогда чтение данных будет осуществляться после выполнения:
while (!this.m_state[index].m_value)
spinWait2.SpinOnce();
Как только m_high текущего сегмента становится равным 31, запись в текущий сегмент прекращается и создается новый сегмент (текущие сегменты продолжают жить своей жизнью).
m_next = new ConcurrentQueue<T>.Segment(this.m_index + 1L, this.m_source);
m_source.m_tail = this.m_next;
m_next – ссылка на следующий сегмент
m_source.m_tail – ссылка последний сегмент списка сегментов.
В основе выборки элементов из очереди лежат две базовые функциональности:
public static extern int CompareExchange(ref int location1, int value, int comparand);
System.Threading.SpinWait is a lightweight synchronization type that you can use in low-level scenarios to avoid the expensive context switches and kernel transitions that are required for kernel events. On multicore computers, when a resource is not expected to be held for long periods of time, it can be more efficient for a waiting thread to spin in user mode for a few dozen or a few hundred cycles, and then retry to acquire the resource. If the resource is available after spinning, then you have saved several thousand cycles. If the resource is still not available, then you have spent only a few cycles and can still enter a kernel-based wait. This spinning-then-waiting combination is sometimes referred to as a two-phase wait operation.
Примерный алгоритм работы выборки:
SpinWait spinWait1 = new SpinWait();
int low = this.Low;
if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low)
{
SpinWait spinWait2 = new SpinWait();
while (!this.m_state[low].m_value)
spinWait2.SpinOnce();
result = this.m_array[low];
Код IsEmpty:
ConcurrentQueue<T>.Segment segment = this.m_head;
if (!segment.IsEmpty)
return false;
if (segment.Next == null)
return true;
SpinWait spinWait = new SpinWait();
for (; segment.IsEmpty; segment = this.m_head)
{
if (segment.Next == null)
return true;
spinWait.SpinOnce();
}
return false;
Т.е. по сути, это найти первый непустой сегмент. Если он найден – очередь не пуста.
Код Count:
ConcurrentQueue<T>.Segment head;
ConcurrentQueue<T>.Segment tail;
int headLow;
int tailHigh;
this.GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
if (head == tail)
return tailHigh - headLow + 1;
return 32 - headLow + 32 * (int) (tail.m_index - head.m_index - 1L) + (tailHigh + 1);
По сути, он ищет первый и последний сегмент и на основе этих двух сегментов вычисляет кол-во элементов.
Вывод — операция Count будет занимать больше процессорного времени, чем IsEmpty.
Структура ConcurrentQueue поддерживает технологию снепшотов для получения целостного набора элементов.
Целостные данные возвращают:
Операторы выше так же работаю без блокировок, а целостность достигается за счет введения счетчика
volatile int m_numSnapshotTakers
в рамках всей очереди — число операций, работающих со снепшотами в текущий момент времени. Т.е. каждая операция, которая хочет получить целостную картину, должна реализовать следующую код:
Interlocked.Increment(ref this.m_numSnapshotTakers);
try
{
...//Итератор по всем сегментам
}
finally
{
Interlocked.Decrement(ref this.m_numSnapshotTakers);
}
В дополнении к этому, изменения у нас «пишет» только операция Dequeue, поэтому только в ней проверяется необходимость удалять ссылку на элемент очереди:
if (this.m_source.m_numSnapshotTakers <= 0)
this.m_array[low] = default (T);
Автор: avsevolodov
Источник [2]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/struktury-danny-h/77211
Ссылки в тексте:
[1] кольцевой буфер: https://ru.wikipedia.org/wiki/%D0%9A%D0%BE%D0%BB%D1%8C%D1%86%D0%B5%D0%B2%D0%BE%D0%B9_%D0%B1%D1%83%D1%84%D0%B5%D1%80
[2] Источник: http://habrahabr.ru/post/245837/
Нажмите здесь для печати.