- PVSM.RU - https://www.pvsm.ru -
System.IO.Pipelines [1] — это новая библиотека, упрощающая организацию кода в .NET. Трудно обеспечить высокую производительность и точность, если приходится иметь дело со сложным кодом. Задача System.IO.Pipelines — упростить код. Подробнее под катом!
Библиотека появилась в результате усилий команды разработчиков .NET Core, которые стремились сделать Kestrel одним из самых быстрых веб-серверов в отрасли [2]. Она изначально задумывалась как часть реализации Kestrel, но превратилась в повторно используемый API, доступный в версии 2.1 в качестве BCL API первого класса (System.IO.Pipelines).
Чтобы правильно анализировать данные из потока или сокета, требуется написать большой объем стандартного кода. При этом существует множество подводных камней, которые усложняют и сам код, и его поддержку.
Начнем с простой задачи. Нам необходимо написать TCP-сервер, который получает от клиента сообщения с разделителями строк (n).
ОТСТУПЛЕНИЕ: как и в любой задаче, требующей высокой производительности, каждый конкретный случай стоит рассматривать исходя из особенностей вашего приложения. Возможно, тратить ресурсы на использование различных подходов, о которых пойдет речь далее, не имеет смысла, если масштаб сетевого приложения не очень велик.
Обычный код .NET перед использованием конвейеров выглядит примерно так:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
см. sample1.cs [3] на GitHub
Вероятно, этот код будет работать при локальном тестировании, но он имеет ряд ошибок:
Это наиболее распространенные ошибки чтения потоковых данных. Чтобы их избежать, необходимо внести ряд изменений:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data
linePosition = Array.IndexOf(buffer, (byte)'n', bytesConsumed, bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset
var lineLength = linePosition - bytesConsumed;
// Process the line
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line we consumed (including n)
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
см. sample2.cs [4] на GitHub
Повторюсь: это могло бы сработать при локальном тестировании, но иногда встречаются строки длиной больше 1 Кб (1024 байта). Необходимо увеличить размер входного буфера до тех пор, пока не будет найдена новая строка.
Кроме того, мы собираем буферы в массив при обработке длинных строк. Мы можем улучшить этот процесс с помощью ArrayPool, что позволит избежать повторного распределения буферов во время анализа длинных строк, поступающих от клиента.
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes
bytesBuffered += bytesRead;
do
{
// Look for a EOL in the buffered data
linePosition = Array.IndexOf(buffer, (byte)'n', bytesConsumed, bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset
var lineLength = linePosition - bytesConsumed;
// Process the line
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line we consumed (including n)
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
см. sample3.cs [5] на GitHub
Код работает, но теперь изменился размер буфера, в результате появляется множество его копий. Также используется больше памяти, поскольку логика не сокращает буфер после обработки строк. Чтобы этого избежать, можно сохранять список буферов, а не менять каждый раз размер буфера при поступлении строк длиннее 1 Кб.
Кроме того, мы не увеличиваем буфер размером 1 Кб, пока он полностью не опустеет. Это значит, что мы будем передавать в ReadAsync буферы все меньшего размера, в результате возрастет число вызовов операционной системы.
Мы постараемся исключить это и будем выделять новый буфер, как только размер существующего станет меньше 512 байт:
public class BufferSegment
{
public byte[] Buffer { get; set; }
public int Count { get; set; }
public int Remaining => Buffer.Length - Count;
}
async Task ProcessLinesAsync(NetworkStream stream)
{
const int minimumBufferSize = 512;
var segments = new List<BufferSegment>();
var bytesConsumed = 0;
var bytesConsumedBufferIndex = 0;
var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };
segments.Add(segment);
while (true)
{
// Calculate the amount of bytes remaining in the buffer
if (segment.Remaining < minimumBufferSize)
{
// Allocate a new segment
segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };
segments.Add(segment);
}
var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining);
if (bytesRead == 0)
{
break;
}
// Keep track of the amount of buffered bytes
segment.Count += bytesRead;
while (true)
{
// Look for a EOL in the list of segments
var (segmentIndex, segmentOffset) = IndexOf(segments, (byte)'n', bytesConsumedBufferIndex, bytesConsumed);
if (segmentIndex >= 0)
{
// Process the line
ProcessLine(segments, segmentIndex, segmentOffset);
bytesConsumedBufferIndex = segmentOffset;
bytesConsumed = segmentOffset + 1;
}
else
{
break;
}
}
// Drop fully consumed segments from the list so we don't look at them again
for (var i = bytesConsumedBufferIndex; i >= 0; --i)
{
var consumedSegment = segments[i];
// Return all segments unless this is the current segment
if (consumedSegment != segment)
{
ArrayPool<byte>.Shared.Return(consumedSegment.Buffer);
segments.RemoveAt(i);
}
}
}
}
(int segmentIndex, int segmentOffest) IndexOf(List<BufferSegment> segments, byte value, int startBufferIndex, int startSegmentOffset)
{
var first = true;
for (var i = startBufferIndex; i < segments.Count; ++i)
{
var segment = segments[i];
// Start from the correct offset
var offset = first ? startSegmentOffset : 0;
var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset);
if (index >= 0)
{
// Return the buffer index and the index within that segment where EOL was found
return (i, index);
}
first = false;
}
return (-1, -1);
}
см. sample4.cs [6] на GitHub
В итоге код существенно усложняется. Во время поиска разделителя мы отслеживаем заполненные буферы. Для этого используется List, который отображает буферизованные данные при поиске нового разделителя строк. В результате ProcessLine и IndexOf будут принимать List вместо byte[], offset и count. Логика синтаксического анализа начнет обрабатывать один сегмент буфера или несколько.
И теперь сервер будет обрабатывать частичные сообщения и использовать объединенную память, чтобы уменьшить общее потребление памяти. Однако нужно сделать еще ряд изменений:
Сложность начинает зашкаливать (а это далеко не все случаи!). Для создания высокопроизводительной сети нужно написать очень сложный код.
Цель System.IO.Pipelines — упростить эту процедуру.
Давайте посмотрим, как работает System.IO.Pipelines:
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
return Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// Tell the PipeReader that there's no more data coming
writer.Complete();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;
do
{
// Look for a EOL in the buffer
position = buffer.PositionOf((byte)'n');
if (position != null)
{
// Process the line
ProcessLine(buffer.Slice(0, position.Value));
// Skip the line + the n character (basically position)
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
}
}
while (position != null);
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete
reader.Complete();
}
см. sample5.cs [7] на GitHub
В конвейерной версии нашего считывателя строк есть два цикла:
В отличие от первых примеров, здесь нет специально назначенных буферов. Это одна из основных функций System.IO.Pipelines. Все задачи по управлению буферами передаются реализациям PipeReader/PipeWriter.
Процедура упрощается: мы используем код только для бизнес-логики вместо того, чтобы реализовывать сложное управление буферами.
В первом цикле сначала вызывается PipeWriter.GetMemory(int), чтобы получить определенный объем памяти от основного записывателя. Затем вызывается PipeWriter.Advance(int), который сообщает PipeWriter, сколько данных фактически записано в буфер. После этого следует вызов PipeWriter.FlushAsync(), чтобы PipeReader получил доступ к данным.
Второй цикл потребляет буферы, которые были записаны PipeWriter, но изначально поступили от сокета. Когда возвращается запрос к PipeReader.ReadAsync(), мы получаем ReadResult, содержащий два важных сообщения: данные, считанные в форме ReadOnlySequence, а также логический тип данных IsCompleted, который сообщает считывателю, закончил ли записыватель работу (EOF). Когда будет найден разделитель конца строки (EOL) и проанализирована строка, мы разделим буфер на части, чтобы пропустить уже обработанный фрагмент. После этого вызывается PipeReader.AdvanceTo, и он сообщает PipeReader, сколько данных было потреблено.
В конце каждого цикла завершается работа и считывателя, и записывателя. В результате основной канал высвобождает всю выделенную память.
Кроме управления памятью System.IO.Pipelines выполняет другую важную функцию: просматривает данные в канале, но не потребляет их.
У PipeReader есть два основных API: ReadAsync и AdvanceTo. ReadAsync получает данные из канала, AdvanceTo сообщает PipeReader о том, что эти буферы больше не требуются считывателю, поэтому от них можно избавиться (например, вернуть в основной буферный пул).
Ниже приведен пример анализатора HTTP, который считывает данные из буферов частичных данных канала, пока не получит подходящую начальную строку.
Реализация канала хранит список связанных буферов, передающихся между PipeWriter и PipeReader. PipeReader.ReadAsync раскрывает ReadOnlySequence, являющийся новым типом BCL и состоящий из одного или нескольких сегментов ReadOnlyMemory<Т>. Он похож на Span или Memory, что дает нам возможность взглянуть на массивы и строки.
Внутри канала есть указатели, которые показывают, где в общем наборе выделенных данных располагаются считыватель и записыватель, а также обновляют их по мере записи и чтения данных. SequencePosition представляет собой единую точку в связанном списке буферов и используется для эффективного разделения ReadOnlySequence<Т>.
Поскольку ReadOnlySequence<Т> поддерживает один сегмент и более, то стандартной операцией высокопроизводительной логики является разделение быстрых и медленных путей исходя из количества сегментов.
В качестве примера приведем функцию, преобразующую ASCII ReadOnlySequence в строку:
string GetAsciiString(ReadOnlySequence<byte> buffer)
{
if (buffer.IsSingleSegment)
{
return Encoding.ASCII.GetString(buffer.First.Span);
}
return string.Create((int)buffer.Length, buffer, (span, sequence) =>
{
foreach (var segment in sequence)
{
Encoding.ASCII.GetChars(segment.Span, span);
span = span.Slice(segment.Length);
}
});
}
см. sample6.cs [8] на GitHub
В идеале чтение и анализ работают совместно: поток чтения потребляет данные из сети и помещает их в буферы, в то время как поток анализа создает подходящие структуры данных. Обычно анализ занимает больше времени, чем простое копирование блоков данных из сети. В результате поток чтения может с легкостью перегрузить поток анализа. Поэтому поток чтения будет вынужден либо замедлить работу, либо потреблять больше памяти, чтобы сохранять данные для потока анализа. Чтобы обеспечить оптимальную производительность, необходим баланс между частотой пауз и выделением большого объема памяти.
Для решения этой проблемы конвейер имеет две функции управления потоком данных: PauseWriterThreshold и ResumeWriterThreshold. PauseWriterThreshold определяет, сколько данных необходимо буферизовать до приостановки PipeWriter.FlushAsync. ResumeWriterThreshold определяет, сколько памяти может потребить считыватель до возобновления работы записывателя.
PipeWriter.FlushAsync «блокируется», когда количество данных в конвейерном потоке превысит лимит, установленный в PauseWriterThreshold, и «разблокируется», когда оно станет ниже установленного в ResumeWriterThreshold. Чтобы предотвратить превышение лимита потребления, используются всего два значения.
При использовании async/await последующие операции обычно вызываются либо в потоках пула, либо в текущем SynchronizationContext.
При осуществлении ввода-вывода очень важно тщательно контролировать, где он выполняется, чтобы эффективнее использовать кэш процессора. Это имеет критическое значение для высокопроизводительных приложений, таких как веб-серверы. System.IO.Pipelines использует PipeScheduler, чтобы определить место выполнения асинхронных ответных вызовов. Это позволяет очень точно контролировать, какие потоки использовать для ввода-вывода.
Пример практического применения — транспорт Kestrel Libuv, в котором обратные вызовы ввода-вывода выполняются по выделенным каналам цикла событий.
Мы также добавили в System.IO.Pipelines ряд новых простых типов BCL:
API находятся в nuget-пакете System.IO.Pipelines [1].
Пример приложения сервера .NET Server 2.1, использующего конвейеры для обработки строчных сообщений (из примера выше) см. здесь [17]. Его можно запустить с помощью dotnet run (или Visual Studio). В примере ожидается передача данных от сокета на порту 8087, затем полученные сообщения записываются на консоль. Для подключения к порту 8087 можно использовать клиент, например netcat или putty. Отправьте строчное сообщение и посмотрите, как это работает.
На данный момент конвейер работает в Kestrel и SignalR, и мы надеемся, что она найдет более широкое применение во множестве сетевых библиотек и компонентов сообщества .NET в будущем.
Автор: Александр Гуреев
Источник [18]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/c-2/296831
Ссылки в тексте:
[1] System.IO.Pipelines: https://www.nuget.org/packages/System.IO.Pipelines/
[2] самых быстрых веб-серверов в отрасли: https://www.techempower.com/benchmarks/#section=data-r16&hw=ph&test=plaintext
[3] sample1.cs: https://gist.github.com/terrajobst/ee86ab15d1d7a1d5869d1c1f2443f3b3#file-sample1-<i>cs
[4] sample2.cs: https://gist.github.com/terrajobst/8e077db206883ca156dfdb7643969c76#file-<i>sample2-cs
[5] sample3.cs: https://gist.github.com/terrajobst/568dad7aa8e831cf4fcb48ca370ca251#file-sample3-cs
[6] sample4.cs: https://gist.github.com/terrajobst/aed8731297b8e8268ae6a37ebfc33146#file-sample4-cs
[7] sample5.cs: https://gist.github.com/terrajobst/7e04b424ab279e711eece8f6b1c233d8#file-sample5-cs
[8] sample6.cs: https://gist.github.com/terrajobst/6e1bea5bec4591edd7c5fe5416ce7f56#file-sample6-cs
[9] MemoryPoolT: https://docs.microsoft.com/en-us/dotnet/api/system.buffers.memorypool-1?view=netcore-2.1
[10] IMemoryOwnerT: https://docs.microsoft.com/en-us/dotnet/api/system.buffers.imemoryowner-1?view=netcore-2.1
[11] MemoryManagerT: https://docs.microsoft.com/en-us/dotnet/api/system.buffers.memorymanager-1?view=netcore-2.1
[12] ArrayPoolT: https://docs.microsoft.com/en-us/dotnet/api/system.buffers.arraypool-1?view=netcore-2.1
[13] IBufferWriterT: https://docs.microsoft.com/en-us/dotnet/api/system.buffers.ibufferwriter-1?view=netcore-2.1
[14] IValueTaskSource: https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.sources.ivaluetasksource-1?view=netcore-2.1
[15] ValueTaskT: https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.valuetask-1?view=netcore-2.1
[16] здесь: https://github.com/dotnet/corefx/issues/27445
[17] здесь: https://github.com/davidfowl/TcpEcho
[18] Источник: https://habr.com/post/423105/?utm_source=habrahabr&utm_medium=rss&utm_campaign=423105
Нажмите здесь для печати.