Простейшие Lock-Free объекты для двух потоков

в 14:11, , рубрики: Delphi, lock-free, Программирование

Здесь было много статей об универсальных Lock-free объектах, однако, для некоторых частных случаев они излишне громоздки. Мой случай как раз таким и являлся: требовалось организовать одностороннюю передачу информации от одного потока другому. Главный поток запускает рабочий, после чего он может только запросить его остановку и никак больше управлять он им не может. В свою очередь рабочий поток может уведомлять главный о своем текущем состоянии (прогрессе выполнения), а также отсылать промежуточные результаты выполнения. Получается, что требуется только передача данных от рабочего к главному потоку.

Разумеется, возможно, я изобрёл велосипед или, хуже того, велосипед с глюками. Поэтому комментарии и критика очень приветствуются!

Объект состояния

Состояние нашего рабочего потока представлено в виде некоторого класса. При этом главный поток не обязан всегда забирать данные, хранящиеся в объекте состояния (например, не важно, если главный поток пропустит промежуточное значение прогресса выполнения, ему важно получить последнее актуальное на данный момент).

Для реализации lock-free передачи состояния нам потребуется три его экземпляра (разных объектов одного класса):

var
  ReadItem: TLockFreeWorkState;
  CurrentItem: TLockFreeWorkState;
  WriteItem: TLockFreeWorkState;

Идея такова: рабочий поток имеет свободный доступ к объекту WriteItem. Когда все данные сохранены выполняется операция InterlockedExchange с объектом в CurrentItem, после чего главный поток каким-то образом уведомляется о готовности нового состояния (в моем примере использован обычный PostMessage). Главный поток в обработчике уведомления выполняет операцию InterlockedExchange объекта CurrentItem с объектом ReadItem, после чего может свободно читать данные из ReadItem.

Получается такой себе «пузырек»: данные о состоянии появляются во WriteItem и далее «всплывают» через CurrentItem в ReadItem. Кстати, я не придумал нормального названия для базового класса такой структуры, поэтому назвал просто TLockFreeWorkState (возможно у кого-то найдется идея получше).

Тут есть один нюанс: главный поток может обращаться за текущим состоянием в любое время. Если мы всегда будет выполнять InterlockedExchange, то попеременно будем возвращать актуальное и предыдущее состояние.

Предотвратить это нам поможет обычный флажок Newest в классе. При записи состояния рабочий поток всегда выставляет WriteItem.Newest := True, и после InterlockedExchange этот флажок оказывается в CurrentItem. Главный поток в начале проверяет CurrentItem.Newest и, только если он True, делает InterlockedExchange после чего сразу его сбрасывает ReadItem.Newest в False. Я посчитал, что читать CurrentItem.Newest из главного потока безопасно, но поправьте меня, если не прав.

Теперь все это в виде упрощенного кода (опущено привидение типов для большей наглядности):

type
  TLockFreeWorkState = class
  public
    Newest: Boolean;
  end;

function Read(var CurrentItem, ReadItem: TLockFreeWorkState): Boolean;
begin
  if CurrentItem.Newest then begin
    ReadItem := InterlockedExchangePointer(CurrentItem, ReadItem);
    ReadItem.Newest := False;
    Result := True;
  end else
    Result := False;
end;

procedure Write(var CurrentItem, WriteItem: TLockFreeWorkState);
begin
  WriteItem.Newest := True;
  WriteItem := InterlockedExchangePointer(CurrentItem, WriteItem);
end;

Объект очереди

В чем-то тут подход схожий, но для реализации нам потребуется изначально только один объект, но две ссылки на него:

var
  ReadQueue: TLockFreeWorkQueue;
  WriteQueue: TLockFreeWorkQueue;

Изначально создается один экземпляр TLockFreeWorkQueue и записывается в переменные ReadQueue и WriteQueue. Класс представляет собой кольцевой буфер и имеет следующее описание:

  TLockFreeWorkQueue = class    
  public
    Head: Integer;
    Tail: Integer;
    Items: array[0..QueueCapacity - 1] of TObject;
  end;

где QueueCapacity является некоторой константой (больше нуля), которая определяет длину кольцевого буфера.

При добавлении элемента в очередь рабочий поток выполняет InterlockedExchangeComparePointer элемента WriteQueue.Items[Tail]. При этом элемент сравнивается с Nil и в случае успеха в него записывается добавляемый элемент. Если операция прошла успешно, то значение Tail увеличивается на 1 и сбрасывается в 0, если достигнут QueueCapacity. Мы можем свободно оперировать с Tail, так как доступ к этой переменной имеет только рабочий поток (поток-писатель). Также после этого рабочий поток должен уведомить главный о том, что в очереди появились элементы. Если операция не удалась, то это означает, что очередь заполнена, но об этом позже.

Главный поток по уведомлению от рабочего начинает цикл чтения элементов из очереди (на самом деле чтение можно начинать в любой момент). Для извлечения элемента вызывается InterlockedExchangePointer для элемента ReadQueue.Items[Head] куда записывается значение Nil. Если извлеченный элемент не Nil, то значение Head увеличивается на 1 и сбрасывается в 0, если достигнут QueueCapacity.

Теперь разберемся со случаем переполнения буфера. Для новых элементов мы вполне может создать новый объект очереди и продолжить писать в него, а чтобы этот объект можно было найти потоку-читателю, мы должны передать на него ссылку в текущем заполненном объекте очереди. Для этого добавим дополнительное поле NextQueue в класс:

  TLockFreeWorkQueue = class    
  public
    Head: Integer;
    Tail: Integer;
    Items: array[0..QueueCapacity - 1] of TObject;
    NextQueue: TLockFreeWorkQueue;
  end;

Теперь если при записи элемента InterlockedExchangeComparePointer возвращает не Nil (очередь заполнена), то создаем новый объект очереди NewWriteQueue: TLockFreeWorkQueue, записываем добавляемый элемент в нее, выполняем InterlockedExchangePointer с переменной WriteQueue.NextQueue и в конце сохраняем NewWriteQueue в переменной WriteQueue. Таким образом после этой операции значения в ReadQueue и WriteQueue уже будут ссылаться на разные объекты.

В главном потоке нам нужно добавить обработку пустой очереди. Если при чтении InterlockedExchangePointer для элемента ReadQueue.Items[Head] возвращает Nil, то нам необходимо дополнительно проверить поле NextQueue, для чего мы также выполняем InterlockedExchangePointer(ReadQueue.NextQueue, Nil). Если при этом возвращается не Nil, то сохраняем объект в NewReadQueue, удаляем текущий объект ReadQueue, и присваиваем этой переменной значение NewReadQueue.

Вот упрощенный код для операций добавления элемента в очередь:

procedure AddQueueItem(var WriteQueue: TLockFreeWorkQueue; Item: TObject);
var
  NewWriteQueue: TLockFreeWorkQueue;
begin
  if InterlockedCompareExchangePointer(WriteQueue.Items[WriteQueue.Tail]), Item, Nil) = Nil then begin
    // Added successfully
    Inc(WriteQueue.Tail);
    if WriteQueue.Tail = QueueCapacity then
      WriteQueue.Tail := 0;
  end else begin
    // WriteQueue full. Create new chained queue.
    NewWriteQueue := TLockFreeWorkQueue.Create;
    NewWriteQueue.Items[0] := Item;
    Inc(NewWriteQueue.Tail);
    if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue
      NewWriteQueue.Tail := 0;
    InterlockedExchangePointer(WriteQueue.NextQueue, NewWriteQueue);
    WriteQueue := NewWriteQueue;
  end;
end;

и извлечения элемента из очереди:

function ExtractQueueItem(var ReadQueue: TLockFreeWorkQueue): TObject;
var
  NewReadQueue: TLockFreeWorkQueue;
begin
  Result := Nil;
  repeat
    Result := InterlockedExchangePointer(ReadQueue.Items[ReadQueue.Head], Nil);
    if Result = Nil then begin
      // No new items in this queue. Check next queue is available
      NewReadQueue := InterlockedExchangePointer(ReadQueue.NextQueue, Nil);
      if Assigned(NewReadQueue) then begin
        ReadQueue.Free;
        ReadQueue := NewReadQueue;
      end else
        // No new item in queue
        Exit;
    end;
  until Result <> Nil;
  // Item extracted successfully
  Inc(ReadQueue.Head);
  if ReadQueue.Head = QueueCapacity then
    ReadQueue.Head := 0;
end;

В этом коде я возможно несколько перестраховался. Не уверен, что для операций с полем NextQueue вообще нужно применять InterlockedExchangePointer, возможно будет безопасным выполнять прямое чтение и запись.

Тестовый пример

Рабочий и причесанный код вместе с простеньким консольным примером можно посмотреть под спойлером.

Тестовый пример

program LockFreeTest;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  SysUtils, Classes, Windows, Messages;

// Lock-free work thread state ////////////////////////////////////////////////
type
  TLockFreeWorkState = class
  protected
    FNewest: Boolean;
  public
    class function Read(var CurrentItem, ReadItem): Boolean;
    class procedure Write(var CurrentItem, WriteItem);
    property Newest: Boolean read FNewest write FNewest;
  end;

class function TLockFreeWorkState.Read(var CurrentItem, ReadItem): Boolean;
begin
  if TLockFreeWorkState(CurrentItem).Newest then begin
    pointer(ReadItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(ReadItem));
    TLockFreeWorkState(ReadItem).Newest := False;
    Result := True;
  end else
    Result := False;
end;

class procedure TLockFreeWorkState.Write(var CurrentItem, WriteItem);
begin
  TLockFreeWorkState(WriteItem).Newest := True;
  pointer(WriteItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(WriteItem));
end;

// Lock-free work thread queue ////////////////////////////////////////////////
type
  TLockFreeWorkQueue = class
  public const
    QueueCapacity = 4; // Small value for test purposes
  public type
    TLockFreeWorkQueueItems = array[0..QueueCapacity - 1] of TObject;
  public
    Head: Integer; // Access from main thread only
    Tail: Integer; // Access from work thread only
    NextQueue: TLockFreeWorkQueue;
    Items: TLockFreeWorkQueueItems;
  public
    destructor Destroy; override;
    class procedure Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject); static;
    class function Extract(var ReadQueue: TLockFreeWorkQueue): TObject; static;
  end;

destructor TLockFreeWorkQueue.Destroy;
var
  i: Integer;
begin
  // Free non-extracted items
  for i := 0 to QueueCapacity - 1 do
    Items[i].Free;
  // Free NextQueue if exists
  NextQueue.Free;
  inherited;
end;

class procedure TLockFreeWorkQueue.Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject);
var
  NewWriteQueue: TLockFreeWorkQueue;
begin
  // Check item assigned (can't add empty items)
  if not Assigned(Item) or not Assigned(WriteQueue) then
    Exit;
  if InterlockedCompareExchangePointer(pointer(WriteQueue.Items[WriteQueue.Tail]), pointer(Item), Nil) = Nil then begin
    // Added successfully
    Inc(WriteQueue.Tail);
    if WriteQueue.Tail = QueueCapacity then
      WriteQueue.Tail := 0;
  end else begin
    // WriteQueue full. Create new chained queue.
    NewWriteQueue := TLockFreeWorkQueue.Create;
    NewWriteQueue.Items[0] := Item;
    Inc(NewWriteQueue.Tail);
    if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue
      NewWriteQueue.Tail := 0;
    InterlockedExchangePointer(pointer(WriteQueue.NextQueue), NewWriteQueue);
    WriteQueue := NewWriteQueue;
  end;
end;

class function TLockFreeWorkQueue.Extract(var ReadQueue: TLockFreeWorkQueue): TObject;
var
  NewReadQueue: TLockFreeWorkQueue;
begin
  Result := Nil;
  if not Assigned(ReadQueue) then
    Exit;
  repeat
    Result := InterlockedExchangePointer(pointer(ReadQueue.Items[ReadQueue.Head]), Nil);
    if Result = Nil then begin
      // No new items in this queue. Check next queue is available
      NewReadQueue := InterlockedExchangePointer(pointer(ReadQueue.NextQueue), Nil);
      if Assigned(NewReadQueue) then begin
        ReadQueue.Free;
        ReadQueue := NewReadQueue;
      end else
        // No new item in queue
        Exit;
    end;
  until Result <> Nil;
  // Item extracted successfully
  Inc(ReadQueue.Head);
  if ReadQueue.Head = QueueCapacity then
    ReadQueue.Head := 0;
end;

// Test work thread ///////////////////////////////////////////////////////////
const
  WM_MAINNOTIFY = WM_USER + 1;

type
  TWorkThreadState = class(TLockFreeWorkState)
  public
    Progress: Integer;
  end;

  TWorkThreadQueueItem = class
  public
    ItemData: Integer;
  end;

  TWorkThread = class(TThread)
  protected
    FMainHandle: THandle;
    FMainNotified: Integer;
    // State fields
    FStateRead: TWorkThreadState;
    FStateCurrent: TWorkThreadState;
    FStateWrite: TWorkThreadState;
    // Queue fields
    FQueueRead: TLockFreeWorkQueue;
    FQueueWrite: TLockFreeWorkQueue;
    // Debug (test) fiels
    FDebugReadQueue: Boolean;
    procedure Execute; override;
    procedure SetState;
    procedure AddQueueItem(Item: TWorkThreadQueueItem);
    procedure NotifyMain;
  public
    constructor Create(CreateSuspended: Boolean);
    destructor Destroy; override;
    function GetState: TWorkThreadState;
    function ExtractQueueItem: TWorkThreadQueueItem;
    procedure NotificationProcessed;
    property MainHandle: THandle read FMainHandle;
  end;

constructor TWorkThread.Create(CreateSuspended: Boolean);
begin
  inherited Create(CreateSuspended);
  // State objects
  FStateRead := TWorkThreadState.Create;
  FStateCurrent := TWorkThreadState.Create;
  FStateWrite := TWorkThreadState.Create;
  // Queue objects
  FQueueRead := TLockFreeWorkQueue.Create;
  FQueueWrite := FQueueRead;
end;

destructor TWorkThread.Destroy;
begin
  inherited;
  FStateRead.Free;
  FStateCurrent.Free;
  FStateWrite.Free;
  // Always destroy read queue only: only read queue may have NextQueue reference
  FQueueRead.Free;
end;

procedure TWorkThread.NotifyMain;
begin
  if InterlockedExchange(FMainNotified, 1) = 0 then
    PostMessage(FMainHandle, WM_MAINNOTIFY, 0, 0);
end;

procedure TWorkThread.NotificationProcessed;
begin
  InterlockedExchange(FMainNotified, 0);
end;

function TWorkThread.GetState: TWorkThreadState;
begin
  TLockFreeWorkState.Read(FStateCurrent, FStateRead);
  Result := FStateRead;
end;

procedure TWorkThread.SetState;
begin
  TLockFreeWorkState.Write(FStateCurrent, FStateWrite);
end;

procedure TWorkThread.AddQueueItem(Item: TWorkThreadQueueItem);
begin
  TLockFreeWorkQueue.Add(FQueueWrite, Item);
end;

function TWorkThread.ExtractQueueItem: TWorkThreadQueueItem;
begin
  Result := TWorkThreadQueueItem(TLockFreeWorkQueue.Extract(FQueueRead));
end;

procedure TWorkThread.Execute;
const
  TestQueueCountToFlush = 10;
var
  ProgressIndex: Integer;
  TestQueueCount: Integer;
  Item: TWorkThreadQueueItem;
begin
  TestQueueCount := 0;
  ProgressIndex := 0;
  while not Terminated do begin
    // Send current progress
    if FStateWrite.Progress <> ProgressIndex then begin
      // All state object fields initialization required
      FStateWrite.Progress := ProgressIndex;
      SetState;
      NotifyMain;
    end;
    // Emulate calculation
    Sleep(500);
    Inc(ProgressIndex);
    // Put intermediate result in queue
    Item := TWorkThreadQueueItem.Create;
    Item.ItemData := ProgressIndex;
    AddQueueItem(Item);
    Inc(TestQueueCount);
    if TestQueueCount = TestQueueCountToFlush then begin
      TestQueueCount := 0;
      // Allow queue reading from main thread
      FDebugReadQueue := True;
      NotifyMain;
    end;
  end;
end;

// Test application ///////////////////////////////////////////////////////////
type
  TMain = class
  protected
    FHandle: THandle;
    FThread: TWorkThread;
    procedure WndProc(var Message: TMessage);
  public
    constructor Create;
    destructor Destroy; override;
    function Run: Boolean;
    property Handle: THandle read FHandle;
  end;

var
  Main: TMain;

constructor TMain.Create;
begin
  FHandle := AllocateHWnd(WndProc);
  FThread := TWorkThread.Create(True);
  FThread.FMainHandle := Handle;
  FThread.Start;
  writeln('Work thread started');
end;

destructor TMain.Destroy;
begin
  writeln('Stopping work thread...');
  FThread.Free;
  writeln('Work thread stopped');
  DeallocateHWnd(FHandle);
  inherited;
end;

procedure TMain.WndProc(var Message: TMessage);
var
  State: TWorkThreadState;
  Item: TWorkThreadQueueItem;
begin
  if Message.Msg = WM_MAINNOTIFY then begin
    FThread.NotificationProcessed;
    State := FThread.GetState;
    // Show current progress
    writeln('Work progress ', State.Progress);
    // Check queue reading allowed
    if FThread.FDebugReadQueue then begin
      writeln('Read queue...');
      repeat
        Item := FThread.ExtractQueueItem;
        try
          if Assigned(Item) then
            writeln('Queue item: ', Item.ItemData);
        finally
          Item.Free;
        end;
      until not Assigned(Item);
      FThread.FDebugReadQueue := False;
    end;
  end else
    Message.Result := DefWindowProc(Handle, Message.Msg, Message.wParam, Message.lParam);
end;

function TMain.Run: Boolean;
var
  Msg: TMsg;
begin
  writeln('Start message loop (Ctrl+C to break)');
  Result := True;
  while Result do
    case Integer(GetMessage(Msg, Handle, 0, 0)) of
      0:
        Break;
      -1:
        Result := False;
      else
        begin
          TranslateMessage(Msg);
          DispatchMessage(Msg);
        end;
    end;
end;

// Console event handler //////////////////////////////////////////////////////

function ConsoleEventProc(CtrlType: DWORD): BOOL; stdcall;
begin
  Result := False;
  case CtrlType of
    CTRL_CLOSE_EVENT,
    CTRL_C_EVENT,
    CTRL_BREAK_EVENT:
      if Assigned(Main) then begin
        PostMessage(Main.Handle, WM_QUIT, 0, 0);
        Result := True;
      end;
  end;
end;

// Main procedure /////////////////////////////////////////////////////////////

begin
  {$IFDEF DEBUG}
  ReportMemoryLeaksOnShutdown := True;
  {$ENDIF}
  try
    SetConsoleCtrlHandler(@ConsoleEventProc, True);
    Main := TMain.Create;
    try
      Main.Run;
    finally
      FreeAndNil(Main);
    end;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
end.

В нормальной ситуации при появлении элемента в очереди он при первой возможности должен извлекаться главным потоком. Однако для тестирования переполнения очереди я добавил поле TWorkThread.FDebugReadQueue, которое при значении False запрещает главному потоку читать из очереди (в методе TWorkThread.Execute введна константа TestQueueCountToFlush = 10, которая разрешает главному потоку чтение только после 10 добавленных элементов).

К сожалению тестовый пример слишком прост и не генерирует коллизий чтения/записи между потоками, когда переключение потока происходит внутри служебных функций чтения/записи. Но тут я не уверен, можно ли вообще проверить все узкие места алгоритма и во что нужно превратить код для этого.

Автор: goobit

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js