System.IO.Pipelines: высокоэффективный IO в .NET

в 7:00, , рубрики: .net, C#, csharp, dotnet, microsoft, pipelines, System.IO.Pipelines, Блог компании Microsoft, Программирование

System.IO.Pipelines — это новая библиотека, упрощающая организацию кода в .NET. Трудно обеспечить высокую производительность и точность, если приходится иметь дело со сложным кодом. Задача System.IO.Pipelines — упростить код. Подробнее под катом!

System.IO.Pipelines: высокоэффективный IO в .NET - 1

Библиотека появилась в результате усилий команды разработчиков .NET Core, которые стремились сделать Kestrel одним из самых быстрых веб-серверов в отрасли. Она изначально задумывалась как часть реализации Kestrel, но превратилась в повторно используемый API, доступный в версии 2.1 в качестве BCL API первого класса (System.IO.Pipelines).

Какие проблемы она решает?

Чтобы правильно анализировать данные из потока или сокета, требуется написать большой объем стандартного кода. При этом существует множество подводных камней, которые усложняют и сам код, и его поддержку.

Какие сложности возникают сегодня?

Начнем с простой задачи. Нам необходимо написать TCP-сервер, который получает от клиента сообщения с разделителями строк (n).

Сервер TCP с NetworkStream

ОТСТУПЛЕНИЕ: как и в любой задаче, требующей высокой производительности, каждый конкретный случай стоит рассматривать исходя из особенностей вашего приложения. Возможно, тратить ресурсы на использование различных подходов, о которых пойдет речь далее, не имеет смысла, если масштаб сетевого приложения не очень велик.

Обычный код .NET перед использованием конвейеров выглядит примерно так:

	async Task ProcessLinesAsync(NetworkStream stream)
	{
	    var buffer = new byte[1024];
	    await stream.ReadAsync(buffer, 0, buffer.Length);
	    
	    // Process a single line from the buffer
	    ProcessLine(buffer);
	}

см. sample1.cs на GitHub

Вероятно, этот код будет работать при локальном тестировании, но он имеет ряд ошибок:

  • Возможно, после одного вызова ReadAsync не будет получено целое сообщение (до конца строки).
  • Он игнорирует результат работы метода stream.ReadAsync() — количество данных, фактически переданных в буфер.
  • Код не обрабатывает прием нескольких строк в одном вызове ReadAsync.

Это наиболее распространенные ошибки чтения потоковых данных. Чтобы их избежать, необходимо внести ряд изменений:

  • Нужно буферизовать входящие данные, пока не будет найдена новая строка.
  • Необходимо проанализировать все строки, возвращаемые в буфер.

	async Task ProcessLinesAsync(NetworkStream stream)
	{
	    var buffer = new byte[1024];
	    var bytesBuffered = 0;
	    var bytesConsumed = 0;
	

	    while (true)
	    {
	        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
	        if (bytesRead == 0)
	        {
	            // EOF
	            break;
	        }
	        // Keep track of the amount of buffered bytes
	        bytesBuffered += bytesRead;
	        
	        var linePosition = -1;
	

	        do
	        {
	            // Look for a EOL in the buffered data
	            linePosition = Array.IndexOf(buffer, (byte)'n', bytesConsumed, bytesBuffered - bytesConsumed);
	

	            if (linePosition >= 0)
	            {
	                // Calculate the length of the line based on the offset
	                var lineLength = linePosition - bytesConsumed;
	

	                // Process the line
	                ProcessLine(buffer, bytesConsumed, lineLength);
	

	                // Move the bytesConsumed to skip past the line we consumed (including n)
	                bytesConsumed += lineLength + 1;
	            }
	        }
	        while (linePosition >= 0);
	    }
	}

см. sample2.cs на GitHub

Повторюсь: это могло бы сработать при локальном тестировании, но иногда встречаются строки длиной больше 1 Кб (1024 байта). Необходимо увеличить размер входного буфера до тех пор, пока не будет найдена новая строка.

Кроме того, мы собираем буферы в массив при обработке длинных строк. Мы можем улучшить этот процесс с помощью ArrayPool, что позволит избежать повторного распределения буферов во время анализа длинных строк, поступающих от клиента.

	async Task ProcessLinesAsync(NetworkStream stream)
	{
	    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
	    var bytesBuffered = 0;
	    var bytesConsumed = 0;
	

	    while (true)
	    {
	        // Calculate the amount of bytes remaining in the buffer
	        var bytesRemaining = buffer.Length - bytesBuffered;
	

	        if (bytesRemaining == 0)
	        {
	            // Double the buffer size and copy the previously buffered data into the new buffer
	            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
	            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
	            // Return the old buffer to the pool
	            ArrayPool<byte>.Shared.Return(buffer);
	            buffer = newBuffer;
	            bytesRemaining = buffer.Length - bytesBuffered;
	        }
	

	        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
	        if (bytesRead == 0)
	        {
	            // EOF
	            break;
	        }
	        
	        // Keep track of the amount of buffered bytes
	        bytesBuffered += bytesRead;
	        
	        do
	        {
	            // Look for a EOL in the buffered data
	            linePosition = Array.IndexOf(buffer, (byte)'n', bytesConsumed, bytesBuffered - bytesConsumed);
	

	            if (linePosition >= 0)
	            {
	                // Calculate the length of the line based on the offset
	                var lineLength = linePosition - bytesConsumed;
	

	                // Process the line
	                ProcessLine(buffer, bytesConsumed, lineLength);
	

	                // Move the bytesConsumed to skip past the line we consumed (including n)
	                bytesConsumed += lineLength + 1;
	            }
	        }
	        while (linePosition >= 0);
	    }
	}

см. sample3.cs на GitHub

Код работает, но теперь изменился размер буфера, в результате появляется множество его копий. Также используется больше памяти, поскольку логика не сокращает буфер после обработки строк. Чтобы этого избежать, можно сохранять список буферов, а не менять каждый раз размер буфера при поступлении строк длиннее 1 Кб.

Кроме того, мы не увеличиваем буфер размером 1 Кб, пока он полностью не опустеет. Это значит, что мы будем передавать в ReadAsync буферы все меньшего размера, в результате возрастет число вызовов операционной системы.

Мы постараемся исключить это и будем выделять новый буфер, как только размер существующего станет меньше 512 байт:

	public class BufferSegment
	{
	    public byte[] Buffer { get; set; }
	    public int Count { get; set; }
	

	    public int Remaining => Buffer.Length - Count;
	}
	

	async Task ProcessLinesAsync(NetworkStream stream)
	{
	    const int minimumBufferSize = 512;
	

	    var segments = new List<BufferSegment>();
	    var bytesConsumed = 0;
	    var bytesConsumedBufferIndex = 0;
	    var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };
	

	    segments.Add(segment);
	

	    while (true)
	    {
	        // Calculate the amount of bytes remaining in the buffer
	        if (segment.Remaining < minimumBufferSize)
	        {
	            // Allocate a new segment
	            segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };
	            segments.Add(segment);
	        }
	

	        var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining);
	        if (bytesRead == 0)
	        {
	            break;
	        }
	

	        // Keep track of the amount of buffered bytes
	        segment.Count += bytesRead;
	

	        while (true)
	        {
	            // Look for a EOL in the list of segments
	            var (segmentIndex, segmentOffset) = IndexOf(segments, (byte)'n', bytesConsumedBufferIndex, bytesConsumed);
	

	            if (segmentIndex >= 0)
	            {
	                // Process the line
	                ProcessLine(segments, segmentIndex, segmentOffset);
	

	                bytesConsumedBufferIndex = segmentOffset;
	                bytesConsumed = segmentOffset + 1;
	            }
	            else
	            {
	                break;
	            }
	        }
	

	        // Drop fully consumed segments from the list so we don't look at them again
	        for (var i = bytesConsumedBufferIndex; i >= 0; --i)
	        {
	            var consumedSegment = segments[i];
	            // Return all segments unless this is the current segment
	            if (consumedSegment != segment)
	            {
	                ArrayPool<byte>.Shared.Return(consumedSegment.Buffer);
	                segments.RemoveAt(i);
	            }
	        }
	    }
	}
	

	(int segmentIndex, int segmentOffest) IndexOf(List<BufferSegment> segments, byte value, int startBufferIndex, int startSegmentOffset)
	{
	    var first = true;
	    for (var i = startBufferIndex; i < segments.Count; ++i)
	    {
	        var segment = segments[i];
	        // Start from the correct offset
	        var offset = first ? startSegmentOffset : 0;
	        var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset);
	

	        if (index >= 0)
	        {
	            // Return the buffer index and the index within that segment where EOL was found
	            return (i, index);
	        }
	

	        first = false;
	    }
	    return (-1, -1);
	}

см. sample4.cs на GitHub

В итоге код существенно усложняется. Во время поиска разделителя мы отслеживаем заполненные буферы. Для этого используется List, который отображает буферизованные данные при поиске нового разделителя строк. В результате ProcessLine и IndexOf будут принимать List вместо byte[], offset и count. Логика синтаксического анализа начнет обрабатывать один сегмент буфера или несколько.

И теперь сервер будет обрабатывать частичные сообщения и использовать объединенную память, чтобы уменьшить общее потребление памяти. Однако нужно сделать еще ряд изменений:

  1. Из ArrayPoolbyte мы используем только Byte[] — стандартно управляемые массивы. Иными словами, при выполнении функции ReadAsync или WriteAsync срок действия буферов привязывается ко времени осуществления асинхронной операции (чтобы взаимодействовать с собственными API ввода-вывода операционной системы). Поскольку закрепленная память не может перемещаться, это сказывается на производительности сборщика мусора и может вызвать фрагментацию массива. Возможно, реализацию пула придется изменить, в зависимости от того, как долго асинхронные операции будут ожидать исполнения.
  2. Пропускную способность можно улучшить, если разорвать связь между логикой чтения и обработки. Мы получаем эффект пакетной обработки, и теперь логика синтаксического анализа сможет считывать большие объемы данных, обрабатывая большие блоки буферов, а не анализируя отдельные строки. В результате код усложняется еще больше:
    • Необходимо создать два цикла, работающих независимо друг от друга. Первый будет считывать данные из сокета, а второй — анализировать буферы.
    • Нужен способ сообщать логике синтаксического анализа, что данные становятся доступны.
    • Также необходимо определить, что произойдет, если цикл будет считывать данные из сокета слишком быстро. Нам нужен способ регулировать цикл считывания, если логика синтаксического анализа не поспевает за ним. Обычно это называют «управлением потоком» или «сопротивлением потоку».
    • Мы должны убедиться, что данные передаются безопасно. Теперь набор буферов используется и циклом считывания, и циклом синтаксического анализа, они работают независимо друг от друга на разных потоках.
    • Логика управления памятью также задействуется двумя разными фрагментами кода: заимствующим данные из буферного пула, который считывает данные из сокета, и возвращающим из буферного пула, который является логикой синтаксического анализа.
    • Нужно быть предельно осторожными с возвратом буферов после исполнения логики синтаксического анализа. Иначе есть вероятность того, что мы вернем буфер, в который все еще ведется запись логики чтения сокета.

Сложность начинает зашкаливать (а это далеко не все случаи!). Для создания высокопроизводительной сети нужно написать очень сложный код.

Цель System.IO.Pipelines — упростить эту процедуру.

TCP-сервер и System.IO.Pipelines

Давайте посмотрим, как работает System.IO.Pipelines:

	async Task ProcessLinesAsync(Socket socket)
	{
	    var pipe = new Pipe();
	    Task writing = FillPipeAsync(socket, pipe.Writer);
	    Task reading = ReadPipeAsync(pipe.Reader);
	

	    return Task.WhenAll(reading, writing);
	}
	

	async Task FillPipeAsync(Socket socket, PipeWriter writer)
	{
	    const int minimumBufferSize = 512;
	

	    while (true)
	    {
	        // Allocate at least 512 bytes from the PipeWriter
	        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
	        try 
	        {
	            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
	            if (bytesRead == 0)
	            {
	                break;
	            }
	            // Tell the PipeWriter how much was read from the Socket
	            writer.Advance(bytesRead);
	        }
	        catch (Exception ex)
	        {
	            LogError(ex);
	            break;
	        }
	

	        // Make the data available to the PipeReader
	        FlushResult result = await writer.FlushAsync();
	

	        if (result.IsCompleted)
	        {
	            break;
	        }
	    }
	

	    // Tell the PipeReader that there's no more data coming
	    writer.Complete();
	}
	

	async Task ReadPipeAsync(PipeReader reader)
	{
	    while (true)
	    {
	        ReadResult result = await reader.ReadAsync();
	

	        ReadOnlySequence<byte> buffer = result.Buffer;
	        SequencePosition? position = null;
	

	        do 
	        {
	            // Look for a EOL in the buffer
	            position = buffer.PositionOf((byte)'n');
	

	            if (position != null)
	            {
	                // Process the line
	                ProcessLine(buffer.Slice(0, position.Value));
	                
	                // Skip the line + the n character (basically position)
	                buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
	            }
	        }
	        while (position != null);
	

	        // Tell the PipeReader how much of the buffer we have consumed
	        reader.AdvanceTo(buffer.Start, buffer.End);
	

	        // Stop reading if there's no more data coming
	        if (result.IsCompleted)
	        {
	            break;
	        }
	    }
	

	    // Mark the PipeReader as complete
	    reader.Complete();
	}

см. sample5.cs на GitHub

В конвейерной версии нашего считывателя строк есть два цикла:

  • FillPipeAsync считывает из сокета и записывает в PipeWriter.
  • ReadPipeAsync считывает из PipeReader и анализирует входящие строки.

В отличие от первых примеров, здесь нет специально назначенных буферов. Это одна из основных функций System.IO.Pipelines. Все задачи по управлению буферами передаются реализациям PipeReader/PipeWriter.

Процедура упрощается: мы используем код только для бизнес-логики вместо того, чтобы реализовывать сложное управление буферами.

В первом цикле сначала вызывается PipeWriter.GetMemory(int), чтобы получить определенный объем памяти от основного записывателя. Затем вызывается PipeWriter.Advance(int), который сообщает PipeWriter, сколько данных фактически записано в буфер. После этого следует вызов PipeWriter.FlushAsync(), чтобы PipeReader получил доступ к данным.

Второй цикл потребляет буферы, которые были записаны PipeWriter, но изначально поступили от сокета. Когда возвращается запрос к PipeReader.ReadAsync(), мы получаем ReadResult, содержащий два важных сообщения: данные, считанные в форме ReadOnlySequence, а также логический тип данных IsCompleted, который сообщает считывателю, закончил ли записыватель работу (EOF). Когда будет найден разделитель конца строки (EOL) и проанализирована строка, мы разделим буфер на части, чтобы пропустить уже обработанный фрагмент. После этого вызывается PipeReader.AdvanceTo, и он сообщает PipeReader, сколько данных было потреблено.

В конце каждого цикла завершается работа и считывателя, и записывателя. В результате основной канал высвобождает всю выделенную память.

System.IO.Pipelines

Частичное чтение

Кроме управления памятью System.IO.Pipelines выполняет другую важную функцию: просматривает данные в канале, но не потребляет их.

У PipeReader есть два основных API: ReadAsync и AdvanceTo. ReadAsync получает данные из канала, AdvanceTo сообщает PipeReader о том, что эти буферы больше не требуются считывателю, поэтому от них можно избавиться (например, вернуть в основной буферный пул).

Ниже приведен пример анализатора HTTP, который считывает данные из буферов частичных данных канала, пока не получит подходящую начальную строку.

System.IO.Pipelines: высокоэффективный IO в .NET - 2

ReadOnlySequenceT

Реализация канала хранит список связанных буферов, передающихся между PipeWriter и PipeReader. PipeReader.ReadAsync раскрывает ReadOnlySequence, являющийся новым типом BCL и состоящий из одного или нескольких сегментов ReadOnlyMemory<Т>. Он похож на Span или Memory, что дает нам возможность взглянуть на массивы и строки.

System.IO.Pipelines: высокоэффективный IO в .NET - 3

Внутри канала есть указатели, которые показывают, где в общем наборе выделенных данных располагаются считыватель и записыватель, а также обновляют их по мере записи и чтения данных. SequencePosition представляет собой единую точку в связанном списке буферов и используется для эффективного разделения ReadOnlySequence<Т>.

Поскольку ReadOnlySequence<Т> поддерживает один сегмент и более, то стандартной операцией высокопроизводительной логики является разделение быстрых и медленных путей исходя из количества сегментов.

В качестве примера приведем функцию, преобразующую ASCII ReadOnlySequence в строку:

	string GetAsciiString(ReadOnlySequence<byte> buffer)
	{
	    if (buffer.IsSingleSegment)
	    {
	        return Encoding.ASCII.GetString(buffer.First.Span);
	    }
	

	    return string.Create((int)buffer.Length, buffer, (span, sequence) =>
	    {
	        foreach (var segment in sequence)
	        {
	            Encoding.ASCII.GetChars(segment.Span, span);
	

	            span = span.Slice(segment.Length);
	        }
	    });
	}

см. sample6.cs на GitHub

Сопротивление потоку и управление потоком

В идеале чтение и анализ работают совместно: поток чтения потребляет данные из сети и помещает их в буферы, в то время как поток анализа создает подходящие структуры данных. Обычно анализ занимает больше времени, чем простое копирование блоков данных из сети. В результате поток чтения может с легкостью перегрузить поток анализа. Поэтому поток чтения будет вынужден либо замедлить работу, либо потреблять больше памяти, чтобы сохранять данные для потока анализа. Чтобы обеспечить оптимальную производительность, необходим баланс между частотой пауз и выделением большого объема памяти.

Для решения этой проблемы конвейер имеет две функции управления потоком данных: PauseWriterThreshold и ResumeWriterThreshold. PauseWriterThreshold определяет, сколько данных необходимо буферизовать до приостановки PipeWriter.FlushAsync. ResumeWriterThreshold определяет, сколько памяти может потребить считыватель до возобновления работы записывателя.

System.IO.Pipelines: высокоэффективный IO в .NET - 4

PipeWriter.FlushAsync «блокируется», когда количество данных в конвейерном потоке превысит лимит, установленный в PauseWriterThreshold, и «разблокируется», когда оно станет ниже установленного в ResumeWriterThreshold. Чтобы предотвратить превышение лимита потребления, используются всего два значения.

Планирование ввода-вывода

При использовании async/await последующие операции обычно вызываются либо в потоках пула, либо в текущем SynchronizationContext.

При осуществлении ввода-вывода очень важно тщательно контролировать, где он выполняется, чтобы эффективнее использовать кэш процессора. Это имеет критическое значение для высокопроизводительных приложений, таких как веб-серверы. System.IO.Pipelines использует PipeScheduler, чтобы определить место выполнения асинхронных ответных вызовов. Это позволяет очень точно контролировать, какие потоки использовать для ввода-вывода.

Пример практического применения — транспорт Kestrel Libuv, в котором обратные вызовы ввода-вывода выполняются по выделенным каналам цикла событий.

Есть и другие преимущества шаблона PipeReader

  • Некоторые базовые системы поддерживают «ожидание без буферизации»: буфер не нужно выделять то тех пор, пока в базовой системе не появятся доступные данные. Так, в Linux с epoll можно не предоставлять буфер для считывания до тех пор, пока данные не будут подготовлены. Это позволяет избежать ситуации, когда имеется множество потоков, ожидающих данные, и требуется сразу же резервировать огромный объем памяти.
  • Конвейер по умолчанию упрощает запись модульных тестов сетевого кода: логика синтаксического анализа отделена от сетевого кода, и модульные тесты запускают эту логику только в буферах в памяти, а не потребляют ее непосредственно из сети. Он также упрощает тестирование сложных шаблонов с отправкой частичных данных. ASP.NET Core использует его для проверки различных аспектов http-средств синтаксического анализа Kestrel.
  • Системы, позволяющие пользовательскому коду задействовать основные буферы ОС (например, зарегистрированные API ввода-вывода Windows), изначально подходят для использования конвейеров, поскольку реализация PipeReader всегда предоставляет буферы.

Другие связанные типы

Мы также добавили в System.IO.Pipelines ряд новых простых типов BCL:

  • MemoryPoolT, IMemoryOwnerT, MemoryManagerT. В .NET Core 1.0 был добавлен ArrayPoolT, а в .NET Core 2.1 теперь имеется более общее абстрактное представление для пула, который работает с любыми MemoryT. Мы получаем точку расширяемости, позволяющую осуществлять более продвинутые стратегии распределения, а также контролировать управление буферами (например, использовать предустановленные буферы вместо исключительно управляемых массивов).
  • IBufferWriterT представляет собой приемник для записи синхронных буферизованных данных (реализуется PipeWriter).
  • IValueTaskSourceValueTaskT существует со времени выпуска .NET Core 1.1, но в .NET Core 2.1 приобрел чрезвычайно эффективные инструменты, обеспечивающие бесперебойные асинхронные операции без распределения. Дополнительную информацию см. здесь.

Как использовать конвейеры?

API находятся в nuget-пакете System.IO.Pipelines.

Пример приложения сервера .NET Server 2.1, использующего конвейеры для обработки строчных сообщений (из примера выше) см. здесь. Его можно запустить с помощью dotnet run (или Visual Studio). В примере ожидается передача данных от сокета на порту 8087, затем полученные сообщения записываются на консоль. Для подключения к порту 8087 можно использовать клиент, например netcat или putty. Отправьте строчное сообщение и посмотрите, как это работает.

На данный момент конвейер работает в Kestrel и SignalR, и мы надеемся, что она найдет более широкое применение во множестве сетевых библиотек и компонентов сообщества .NET в будущем.

Автор: Александр Гуреев

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js