Сортировка слиянием — не так просто, как кажется

в 8:28, , рубрики: .net, C#, Unicode, Алгоритмы, внешняя сортировка, оптимизация, Программирование, сортировка слиянием

В одной конторе соискателю на позицию Senior C# developer выдали тестовое задание: отсортировать файл со строками определенного формата.

Требования такие:

  • Формат строки: число, точка, пробел, далее любые символы до конца строки.

  • Порядок сортировки — сначала сортируем текстовой части строки, потом по числу если текстовые части совпадают.

  • Кодировка — UTF-8.

  • Размер файла — 100гб - гарантированно больше объема ОП.

  • Должно отработать за 1 час на машине проверяющего, вряд ли там будет супер-быстрый SSD и огромное количество оперативной памяти.

Как и многие другие программисты, узнав о таком тестовом задании, я возмутился. Внешнюю сортировку слиянием практически всех проходили в ВУЗе, но практически никто никогда не писал её. Задача очень непрактическая и непонятно какие навыки проверяет. Так мне казалось.

Эта задача вызвала бурные обсуждения о способах её решения. Многие программисты, причисляющие себя к рангу senior, предложили использовать базы данных, ибо не барское это дело - вручную писать алгоритмы сортировки. Некоторые даже попытались сделать решение на Apache Spark. Однако никто до конца задачу не решил, ибо мало кому удалось отсортировать в нужном порядке даже 10ГБ файл менее чем за 15 минут без SSD.

Я подумал, что стоит решить задачу до конца с помощью программирования, и тоже причислить себя к рангу senior developer.

В первую очередь я написал генератор тестового файла, который генерирует нужное количество строк из исходного файла. В качестве исходного взял первый том Войны и Мира, так как там есть как русские, так и английские символы.

Код генератора
var source = (from l in File.ReadLines("source.txt")
              where !string.IsNullOrEmpty(l)
              from s in l.Split(new[] { '.', '?', '!', '[', ']' }, StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries)
              where s.Length > 10
              select s).ToList();

Random rand = new();

using (var f = File.CreateText(file))
{
    f.AutoFlush = false;
    while(f.BaseStream.Position < maxSize)
    {
        var n = rand.Next();
        f.Write(n);
        f.Write(". ");
        f.WriteLine(source[rand.Next(source.Count)]);
    }
}
return 0;

Для начала решил сгенерировать 10ГБ, чтобы не ждать час на каждом тестовом прогоне. Кроме того файл такого размера не помещается в кэши операционной системы и операции чтения-записи доходят до диска, что дает представление о реальном быстродействии на больших объемах.

Самое простое работающее решение

Все началось со статьи на хабре о внешней сортировке. Сразу отбросил идею нескольких прогонов для объединения блоков, так как это привело бы к дополнительным затратам на запись. Весь код разделил на две фазы — разбиение исходного файла на отдельные блоки (чанки, от английского chunk) и сортировка строк в блоках, слияние блоков в один файл.

Код разбиения:

var count = 0;
var tempFiles =
    File.ReadLines(file)
        .Select(s => new Item(s, s.IndexOf('.')))
        .Chunk(chunkSize)
        .Select(chunk =>
        {
            Array.Sort(chunk, comparer);
            var tempFileName = Path.ChangeExtension(file, $".part-{count++}" + Path.GetExtension(file));
            File.WriteAllLines(tempFileName, chunk.Select(x => x.Line));
            return tempFileName;
        }).ToList();

Код слияния:

try
{
    var mergedLines = tempFiles
        .Select(f => File.ReadLines(f).Select(s => new Item(s, s.IndexOf('.'))))
        .Merge(comparer) // IEnumerable<IEnumerable<T>> -> IEnumerable<T>
        .Select(x => x.Line);
    File.WriteAllLines(Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file)), mergedLines);
}
finally
{
    tempFiles.ForEach(File.Delete);
}

Для того, чтобы удобнее писать код, определил тип, содержащий строку и позицию точки в строке и компаратор для этого типа:

public record struct Item(string Line, int DotPosition);
public record Comparer(StringComparison StringComparison) : IComparer<Item>
{
    public int Compare(Item x, Item y)
    {
        var spanX = x.Line.AsSpan();
        var spanY = y.Line.AsSpan();
        var xDot = x.DotPosition;
        var yDot = y.DotPosition;

        var cmp = spanX[(xDot + 2)..].CompareTo(spanY[(yDot + 2)..], StringComparison);
        if (cmp != 0) return cmp;
        return int.Parse(spanX[..xDot]) - int.Parse(spanY[..yDot]);
    }
}

"Сердце" всего алгоритма внешней сортировки - слияние итераторов:

public static IEnumerable<T> Merge<T>(this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer = default)
{
    var enumerators = (from source in sources
                        let e = source.GetEnumerator()
                        where e.MoveNext()
                        select e).ToList();
            
    while (enumerators.Count > 0)
    {
        var min = enumerators.MinBy(e => e.Current, comparer)!;
        yield return min.Current;
        if (!min.MoveNext())
        {
            min.Dispose();
            enumerators.Remove(min);
        }
    }
}

Почему я не использовал asyncawait? Ведь сейчас все программисты C# втыкают asyncawait на автомате. Конечно я тоже так сделал сначала, но потом убрал.

Во-первых для асинхронных итераторов сложнее написать Merge. Во-вторых код с asyncawait медленнее работал. asyncawait несет дополнительные расходы на переключение контекста, продолжения вызывают всю цепочку асинхронных методов. Это может быть выгодно когда нам надо распараллелить ожидание, но в этом коде никаких параллельных ожиданий нет. Все операции происходят последовательно.

Первый запуск

Запустил сортировку слиянием, размер чанка - 1М строк или около 157Мб, время работы - 15:30, пятнадцать с половиной минут! В час для 100Гб уложиться не выйдет.

Что по вашему тормозило в этом коде больше всего? Напишите свой вариант в комментариях, прежде чем разворачивать спойлер и читать дальше.

Тайминг
SplitSort done in 00:04:59.2942000
Merge done in 00:10:32.1238153

Диспетчер задач показывал, что во время сортировки ресурсы компьютера задействуются очень мало:

Нагрузка на процессор в фазе разбиения (ЦП7 выполнял код)
Нагрузка на процессор в фазе разбиения (ЦП7 выполнял код)
Нагрузка на диск в фазе разбиения
Нагрузка на диск в фазе разбиения

Код по ссылке

Оптимизируем слияние

Дольше всего выполняется не чтение или запись, а поиск минимального элемента во время слияния. Этот код я честно написал сам, не подсматривая в готовые решения. Гораздо эффективнее будет отсортировать итераторы один раз, а далее поддерживать их отсортированность после вызова .MoveNext(), даже на StackOverflow предлагают такой вариант.

Лучше всего подойдет двоичная (она же бинарная) куча. Она имеет минимальный элемент в корне и позволяет восстановить отсортированность за O(logN), где K - количество элементов в куче (у нас равно числу чанков). Естественно это я не сам придумал, а подсмотрел в интернете.

Методы работы с кучей
public static void Heapify<T>(this Span<T> heap, int index, IComparer<T> comparer)
{
    ArgumentNullException.ThrowIfNull(comparer);

    var min = index;
    while (true)
    {
        var leftChild = 2 * index + 1;
        var rightChild = 2 * index + 2;
        var v = heap[index];

        if (rightChild < heap.Length && comparer.Compare(v, heap[rightChild]) > 0)
        {
            min = rightChild;
            v = heap[min];
        }

        if (leftChild < heap.Length && comparer.Compare(v, heap[leftChild]) > 0)
        {
            min = leftChild;
        }

        if (min == index) break;

        var temp = heap[index];
        heap[index] = heap[min];
        heap[min] = temp;

        index = min;
    }
}

public static void BuildHeap<T>(this Span<T> heap, IComparer<T> comparer)
{
    ArgumentNullException.ThrowIfNull(comparer);

    for (int i = heap.Length / 2; i >= 0; i--)
    {
        Heapify(heap, i, comparer);
    }
}

Код метода слияния:

public static IEnumerable<T> Merge<T>(this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer = default)
{
    var heap = (from source in sources
                let e = source.GetEnumerator()
                where e.MoveNext()
                select e).ToArray();

    var enumeratorComparer = new EnumeratorComparer<T>(comparer ?? Comparer<T>.Default);
    heap.AsSpan().BuildHeap(enumeratorComparer);

    while (true)
    {
        var min = heap[0];
        yield return min.Current;
        if (!min.MoveNext())
        {
            min.Dispose();
            if (heap.Length == 1) yield break;
            heap[0] = heap[^1];
            Array.Resize(ref heap, heap.Length - 1);
        }
        heap.AsSpan().Heapify(0, enumeratorComparer);
    }
}

private record EnumeratorComparer<T>(IComparer<T> comparer) : IComparer<IEnumerator<T>>
{
    public int Compare(IEnumerator<T>? x, IEnumerator<T>? y)
    {
        return comparer.Compare(x!.Current, y!.Current);
    }
}

Остальной код программы не изменился. Время работы:

SplitSort done in 00:04:27.8391844
Merge done in 00:02:11.4364005

Значительно лучше, но до заветного часа на 100ГБ еще очень далеко. Тут стоит обратить внимание, что из-за кэша файловой системы время работы может варьироваться +-15%

Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/heapsort

Оптимизируем разбиение

Фазы разбиения и слияния выполняют одинаковое количество чтения-записи, создают одинаковое количество объектов типа string, но фаза разбиения использует в 2,5 раз больше памяти и запуск под отладчиком показывает множество сборок мусора.

Все дело во времени жизни объектов. В фазе слияния объект строки живет от чтения из чанка до записи в результирующий файл. Когда считывается следующая строка из чанка предыдущая уже превратилась мусор. Мусор убирается в нулевом поколении сборщика, это происходит быстро и память не растет.

В фазе разбиения объекты строк живут от чтения из исходного файла до записи в чанк. Большинство объектов строк переживает несколько сборок мусора, что создает повышенную активность сборщика и увеличивает потребляемую память.

Мы не можем уменьшить время жизни строк на фазе разбиения. Но их можно вообще не создавать! Можно прочитать из файла блок символов, разделить по символу перевода строки и использовать вместо строк тип ReadOnlyMemory<char>, который предоставляет ту же функциональность. ReadOnlyMemory<char> это структура (не требует аллокаций в управляемой куче), которая представляет из себя ссылку на массив, смещение и длину.

Код разбиения без аллокаций:

List<string> tempFiles = new();
List<Item> chunk = new();
using (var reader = File.OpenText(file))
{
    var chunkBuffer = new char[chunkSize];
    var chunkReadPosition = 0;
    var eos = reader.EndOfStream;
    while (!eos)
    {
        // Читаем из файла весь буфер
        var charsRead = reader.ReadBlock(chunkBuffer.AsSpan(chunkReadPosition));
        eos = reader.EndOfStream;
        var m = chunkBuffer.AsMemory(0, chunkReadPosition + charsRead);

        // Заполняем список строк ReadOnlyMemory<char> для сортировки
        int linePos;
        while ((linePos = m.Span.IndexOf(Environment.NewLine)) >= 0 || (eos && m.Length > 0))
        {
            var line = linePos >= 0 ? m[..linePos] : m;
            chunk.Add(new Item(line, line.Span.IndexOf('.')));
            m = m[(linePos + Environment.NewLine.Length)..];
        }

        chunk.Sort(comparer);

        // Записываем строки из отсортированного списка во временный файл
        var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}" + Path.GetExtension(file));
        using (var tempFile = File.CreateText(tempFileName))
        {
            foreach (var (l, _) in chunk)
            {
                tempFile.WriteLine(l);
            }
        }
        tempFiles.Add(tempFileName);

        if (eos) break;
        chunk.Clear();

        //Отсток буфера переносим в начало
        m.CopyTo(chunkBuffer);
        chunkReadPosition = m.Length;
    }
}

Можно было бы оставить код в функциональном стиле, но тогда код получился бы более неуклюжим из-за необходимости передачи флага конца файла.

В структурах данных заменил string наReadOnlyMemory<char>и больше ничего не изменилось.

Время работы при размере чанка в 100М символов, 161Мб на диске:

SplitSort done in 00:03:50.6780519
Merge done in 00:02:19.5627238

Удалось выиграть еще 30 сек и сократить расход памяти на фазе разбиения со 600 до 250 мегабайт. Как говорится Allocation is cheap… until it is not (статья от другом, но заголовок подходит).

Код по ссылке

К сожалению на этом все простые оптимизации кончились, а суммарное время работы все еще не позволит уложиться в час.

Как сравнивать строки

Для многих программистов сравнение строк это все еще посимвольное, а для тех кто пришел из С — побайтное сравнение. Но примерно с 2000 года все используют юникод. Юникод это не просто два байта на символ и кодировки переменной длины, вроде UTF8, это еще правила сравнения, нормализации и подсчета символов. Кто еще не в курсе - посмотрите доклад Plain Text Дилана Битти на NDC. Это один из лучших докладов за всю историю конференций.

Сравнение юникодных строк описано в стандарте Unicode Collation Algorithm (UCA). Это очень сложный алгоритм, который опирается на таблицы весов символом для разных культур. Этот алгоритм реализован в операционной системе (CompareStringW, CompareStringEx в Windows и CompareString из libSystem.Globalization.Native.so в Linux).

Конечно можно от этого всего отказаться и сравнивать строки посимвольно, это ускорит сортировку почти на минуту, так как .NET не использует системные API для этого. Достаточно указать StringComparison.Ordinal в Comparer. Кроме того, отказ от UCA позволяет использовать поразрядные (radix) алгоритмы сортировки, которые должны работать быстрее обычных. Но изменит порядок сортировки и фактически является оптимизацией под один частный случай. Не будет простых способов вернуться к UCA без потери быстродействия.

Один из шагов UCA — получение ключа сортировки (sort key) для строк — простого массива байт, который можно использовать для побайтного сравнения. Оказывается в .NET есть функция получения ключа сортировки строк CompareInfo.GetSortKey. То есть мы можем получить эти байты и потом сравнивать их. Если дописать в конец полученного массива байты числа, стоящего в начале, то мы можем всю сортировку свести к сортировке байтовых массивов.

Скоро 15 лет как я программирую на .NET и я узнал о наличии ключей сортировки строк и соответствующих классов только когда решал эту задачу.

Пытаемся оптимизировать сортировку

Для начала добавим получение ключей и сортировку по ним в методы разбиения и слияния:

List<string> tempFiles = new();
List<Item> chunk = new();
using (var reader = File.OpenText(file))
{
    var keyBuffer = new byte[chunkSize * 2]; //Буфер для ключей
    var chunkBuffer = new char[chunkSize];
    var chunkReadPosition = 0;
    var eos = reader.EndOfStream;
    while (!eos)
    {
        // Читаем из файла весь буфер
        var charsRead = reader.ReadBlock(chunkBuffer.AsSpan(chunkReadPosition));
        eos = reader.EndOfStream;
        var m = chunkBuffer.AsMemory(0, chunkReadPosition + charsRead);
        var key = keyBuffer.AsMemory();

        // Заполняем список строк ReadOnlyMemory<char> для сортировки
        int linePos;
        while ((linePos = m.Span.IndexOf(Environment.NewLine)) >= 0 || (eos && m.Length > 0))
        {
            var line = linePos >= 0 ? m[..linePos] : m;
            var s = line.Span;
            var dot = line.Span.IndexOf('.');
            int x = int.Parse(s[..dot]);
            s = s[(dot + 2)..];
            var keyLen = culture.CompareInfo.GetSortKey(s, key.Span);    // Получаем ключ
            BinaryPrimitives.WriteInt32BigEndian(key[keyLen..].Span, x); // Добписываем число в конец ключа, чтобы старшый байт был с меньшим индексом
            keyLen += sizeof(int);

            chunk.Add(new Item(line, key[..keyLen]));
            m = m[(linePos + Environment.NewLine.Length)..];
            key = key[keyLen..];
        }

        chunk.Sort(comparer);

        // Записываем строки из отсортированного списка во временный файл
        var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}" + Path.GetExtension(file));
        using (var tempFile = File.CreateText(tempFileName))
        {
            foreach (var (l, _) in chunk)
            {
                tempFile.WriteLine(l);
            }
        }
        tempFiles.Add(tempFileName);

        if (eos) break;
        chunk.Clear();

        //Остаток буфера переносим в начало
        m.CopyTo(chunkBuffer);
        chunkReadPosition = m.Length;
    }
}

При слиянии нам также надо получать ключи:

try
{
    var mergedLines = tempFiles
        .Select(f => File.ReadLines(f).Select(s => // Читаем построчно все файлы 
        {
            var m = s.AsMemory();
            var dot = s.IndexOf('.');              // Находим в строках точку
            int x = int.Parse(s.AsSpan(0, dot));

            // Получаем ключ того, что находится после точки с пробелом
            var key = new byte[s.Length * 2 + sizeof(int)];
            var keyLen = culture.CompareInfo.GetSortKey(m[(dot + 2)..].Span, key); 
            
            // Дописываем число в конец
            BinaryPrimitives.WriteInt32BigEndian(key.AsSpan(keyLen), x);         
            return new Item(m, key);
        }))
        .Merge(comparer);  //Слияние итераторов IEnumerable<IEnumerable<T>> в IEnumerable<T>
  
    using var sortedFile = File.CreateText(Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file)));
    foreach (var (l, _) in mergedLines)
    {
        sortedFile.WriteLine(l);
    }
}
finally
{
    tempFiles.ForEach(File.Delete);
}

Компаратор теперь очень простой:

public record struct Item(ReadOnlyMemory<char> Line, ReadOnlyMemory<byte> Key);
public class Comparer : IComparer<Item>
{
    public int Compare(Item x, Item y)
    {
        return x.Key.Span.SequenceCompareTo(y.Key.Span);
    }
}

Результаты ожидаемо хуже:

SplitSort done in 00:04:09.5091207
Merge done in 00:03:02.5646277

Мы проиграли 40 секунд на слиянии из-за получения ключей и 10 секунд на разбиении и сортировке. Сортировка ключей оказалась эффективнее, чем сортировка строк, но накладные расходы на получение ключей убили весь выигрыш.

Зато теперь можно применить поразрядную (Radix) сортировку ключей. Я написал два варианта поразрядной сортировки - Radix Quick Sort aka Multi-key QuickSort (просто перевел на C# алгоритм описанный в статье) и Counting Radix Sort (в основном скопировал код отсюда). К сожалению оба варианта проиграли стандартному Array.Sort(Код этих сортировок в статье не привожу, чтобы не забивать объем, но вы сможете найти его в исходниках вместе с бенчмарками по ссылке в конце статьи). Скорее всего потому, что сравнение блоков памяти методом SequenceCompareTo оптимизируется с помощью SIMD и работает гораздо быстрее, чем ручной код сравнения по разрядам.

Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/sort-key

На этом месте я устал и лег спать.

А что если сохранять ключи?

С этой мыслью я проснулся на следующий день.

  • Во-первых сохраняя ключи во временном файле мы можем не получать ключ сортировки через API в фазе слияния.

  • Во-вторых нам вообще даже не надо декодировать символы в фазе слияния, мы можем просто сохранять нужное количество байт в выходном файле.

  • В-третьих, спустившись на уровень файловых потоков (FileStream вместо StreamReader) мы сможем эффективнее управлять буферизацией.

Я сделал бенчмарк, где сравнил все способы построчного чтения файлов, где сравнил File.ReadLines, StreamReader, FileStream и различные варианты буферизации, а также модный молодежный PipeReader. Победил, ожидаемо, FileStream, как самый низкоуровневый инструмент. Кроме того если вы будете читать или записывать данные большими блоками, то выгодно отключать встроенную буферизацию .NET, а если маленькими, то указывать большой размер буфера (код бенчмарков по ссылке в конце статьи).

Много кода

Фаза разбиения

public void SplitSort()
{
    using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 0, FileOptions.SequentialScan);
    fileSize = stream.Length;

    List<SortKey> chunk = new();

    var keyBuffer = new byte[maxChunkSize];
    var readBuffer = new byte[maxChunkSize];
    var remainingBytes = 0;

    var charBuffer = new char[1024];
    var eof = false;
    while (!eof)
    {
        var bytesRead = stream.ReadBlock(readBuffer, remainingBytes, maxChunkSize - remainingBytes, out eof);
        int chunkSize = remainingBytes + bytesRead;
        if (!eof)
        {
            var lastNewLine = readBuffer.AsSpan(0, bytesRead).LastIndexOf(NewLine);
            if (lastNewLine >= 0) chunkSize = lastNewLine + NewLine.Length;
            remainingBytes = remainingBytes + bytesRead - chunkSize;
        }

        chunk.AddRange(ParseChunk(chunkSize, readBuffer, keyBuffer, charBuffer));

        //Сортируем и записываем чанки на диск
        chunk.Sort(comparer);
        WriteChunk(chunk);

        chunk.Clear();
        //Остаток буфера переносим в начало
        if (remainingBytes > 0) readBuffer.AsSpan(chunkSize, remainingBytes).CopyTo(readBuffer.AsSpan());
    }
}

Функция чтения строк и получения ключей сортировки

private IEnumerable<SortKey> ParseChunk(int byteCount, byte[] readBuffer, byte[] keyBuffer, char[] charBuffer)
{
    var readPos = 0;
    var key = keyBuffer.AsMemory();
    while (byteCount > 0)
    {
        var linePos = readBuffer.AsSpan(readPos, byteCount).IndexOf(NewLine);
        if (linePos == -1) linePos = byteCount;
        if (charBuffer.Length < linePos) charBuffer = new char[linePos];

        // Надо обязательно вызывать именно эту перегрузку, потому что остальные аллоцируют память
        var lineLen = encoding.GetChars(readBuffer, readPos, linePos, charBuffer, 0);
        var line = charBuffer.AsMemory(0, lineLen);
        var s = line.Span;
        var dot = s.IndexOf('.');
        var x = int.Parse(s[0..dot]);

        var keyLen = culture.CompareInfo.GetSortKey(s[(dot + 2)..], key.Span, compareOptions);
        BinaryPrimitives.WriteInt32BigEndian(key[keyLen..].Span, x);
        keyLen += sizeof(int);

        var lineSize = linePos + NewLine.Length;
        yield return new SortKey(readBuffer.AsMemory(readPos, lineSize), key[..keyLen]);
        key = key[keyLen..];

        readPos += lineSize;
        byteCount -= lineSize;
        maxLineSize = Math.Max(maxLineSize, lineSize);
        maxKeyLength = Math.Max(maxKeyLength, keyLen);
    }
}

Функция записи чанка на диск

void WriteChunk(List<SortKey> chunk)
{
    // Записываем строки из отсортированного списка во временный файл
    var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}.tmp");
    using var stream = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None, BufferSize, FileOptions.SequentialScan);
        
    Span<byte> buffer = stackalloc byte[sizeof(int)];
    foreach (var (line, key) in chunk)
    {
        BinaryPrimitives.WriteInt32LittleEndian(buffer, line.Length);
        stream.Write(buffer);
        stream.Write(line.Span);

        BinaryPrimitives.WriteInt32LittleEndian(buffer, key.Length);
        stream.Write(buffer);
        stream.Write(key.Span);
    }
    tempFiles.Add(tempFileName);
}

Фаза слияния

public void Merge()
{
    var mergedLines = tempFiles
        .Select(ReadTempFile) // Читаем построчно все файлы, находим в строках точку
        .Merge(comparer);  //Слияние итераторов IEnumerable<IEnumerable<T>> в IEnumerable<T>

    string sortedFileName = Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file));
    using var sortedFile = new FileStream(sortedFileName, FileMode.Create, FileAccess.Write, FileShare.None, BufferSize, FileOptions.SequentialScan);
    sortedFile.SetLength(fileSize);
    foreach (var (l, _) in mergedLines)
    {
        sortedFile.Write(l.Span);
    }
}

Чтение временного файла

private IEnumerable<SortKey> ReadTempFile(string file)
{
    using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, BufferSize, FileOptions.SequentialScan);

    var maxBlockSize = maxLineSize + maxKeyLength + sizeof(int) * 2;
    var readBuffer = new byte[Math.Max(BufferSize, maxBlockSize)];

    var bytesRemaining = 0;
    var eof = false;

    while (!eof)
    {
        var bytesRead = stream.ReadBlock(readBuffer, bytesRemaining, readBuffer.Length - bytesRemaining, out eof);
        if (bytesRead == 0) eof = true;
        var mem = readBuffer.AsMemory(0, bytesRemaining + bytesRead);

        while (mem.Length > maxBlockSize || (eof && mem.Length > 0))
        {

            var lineSize = BinaryPrimitives.ReadInt32LittleEndian(mem.Span);
            mem = mem[sizeof(int)..];

            var line = mem[..lineSize];
            mem = mem[lineSize..];

            var keyLen = BinaryPrimitives.ReadInt32LittleEndian(mem.Span);
            mem = mem[sizeof(int)..];

            yield return new SortKey(line, mem[..keyLen]);
            mem = mem[keyLen..];
        }

        mem.CopyTo(readBuffer);

        bytesRemaining = mem.Length;
    }
}

Из 25 строк кода в самом начале, написанных даже без классов и метода Main, всё превратилось в 150 строк без учета конструктора и полей класса.

Результаты забега при установке размера чанка в 100М байт. Так как теперь вместе со строками записываются ключи размер одного временного файла на диске составляет 180МБ.

SplitSort done in 00:04:12.8286312
Merge done in 00:03:05.3477665

Результат приблизительно равен предыдущему, но это при учете что теперь мы пишем и читаем не 10Гб временных файлов, а 18гб. В таск менеджере заметно, что быстродействие теперь сильно упирается в диск.

Если быстродействие сильно упирается в диск, то нужно данные сжать. Так мне говорила бабушка прочитал в книге по базам данных. Завернем FileStream в BrotliStream при записи и чтении временных файлов. Brotli — это новый алгоритм сжатия, который пока еще приходит в веб и другие аспекты разработки. Подробнее можно прочитать на википедии.

Результаты забега со сжатием

SplitSort done in 00:04:28.3044728
Merge done in 00:00:36.4300613

В сумме меньше 5 минут. Суммарный объем временных файлов на диске сократился до 970МБ, то есть почти в 20 раз. Это понятно, так как в файлах очень много повторяющихся строк. Возможно на других текстовых файлах результат будет не настолько выдающимся, но все равно написанные человеком или chatGpt тексты будут хороши сжиматься.

Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/sort-key-with-compression

Быстродействие теперь упирается не в диск, а в процессор. И это хорошо. Диск у нас один, а процессоров зачастую больше.

Распараллеливание

Сейчас программа выполняется последовательно:

  1. Чтение чанка (нагружает диск и не использует процессор)

  2. Парсинг строк и получение ключей (нагружает процессор в основном)

  3. Сортировка (сильно нагружает процессор)

  4. Сжатие данных (сильно нагружает процессор)

  5. Запись (сильно нагружает диск)

Было бы неплохо пункты 1 и 5 выполнять параллельно с 2-4.

Заведем пять отдельных потоков для каждой задачи. Для передачи чанков между потоками воспользуемся библиотекой System.Threading.Channels.

readToParse = Channel.CreateBounded<(byte[], int)>(1); // Буфер и размер
parseToSort = Channel.CreateBounded<(List<SortKey>, byte[], byte[])>(1);     // Список ключей, буфер строк и буфер ключей
sortToCompress = Channel.CreateBounded<(List<SortKey>, byte[], byte[])>(1)); // Список ключей, буфер строк и буфер ключей
compressToWrite = Channel.CreateBounded<(byte[], int)>(1); // Сжатые данные и размер

parserThreads =
    Enumerable
    .Range(0, degreeOfParallelism)
    .Select(_ => Task.Run(ParallelParser)).ToArray();

sorterThreads =
    Enumerable
    .Range(0, degreeOfParallelism)
    .Select(_ => Task.Run(ParallelSorter)).ToArray();

compressThreads =
    Enumerable
    .Range(0, degreeOfParallelism)
    .Select(_ => Task.Run(ParallelCompressor)).ToArray();

writerThread = Task.Run(ParallelWriter);

Нам нужен ограниченный канал с емкостью в одно сообщение. Если сообщение уже есть в очереди, то есть получатели заняты обработкой предыдущего, то отправитель будет висеть в ожидании освобождения канала. Таким образом нагрузка будет автоматически балансироваться.

Метод SplitSort изменим так, чтобы он мог работать как в синхронном режиме, так и в параллельном

using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 0, FileOptions.SequentialScan);
fileSize = stream.Length;

List<SortKey>? chunk = null;
byte[]? keyBuffer = null;
char[]? charBuffer = null;

var readBuffer = pool!.Rent(maxChunkSize);
var remainingBytes = 0;
var eof = false;


while (!eof)
{
    var bytesRead = stream.ReadBlock(readBuffer, remainingBytes, maxChunkSize - remainingBytes, out eof);
    int chunkSize = remainingBytes + bytesRead;
    if (!eof)
    {
        var lastNewLine = readBuffer.AsSpan(0, bytesRead).LastIndexOf(NewLine);
        if (lastNewLine >= 0) chunkSize = lastNewLine + NewLine.Length;
        remainingBytes = remainingBytes + bytesRead - chunkSize;
    }

    var oldBuffer = readBuffer;
    if (degreeOfParallelism > 0)
    {
        await readToParse.Writer.WriteAsync((readBuffer, chunkSize));
        readBuffer = pool.Rent(maxChunkSize);
    }
    else
    {
        chunk ??= new();

        chunk.AddRange(ParseChunk(chunkSize, readBuffer,
            keyBuffer ??= pool.Rent(maxChunkSize),
            charBuffer ??= new char[1024]));

        //Сортируем и записываем чанки на диск
        chunk.Sort(comparer);
        WriteChunk(chunk);
        chunk.Clear();
    }

    //Осаток буфера переносим в начало
    if (remainingBytes > 0) oldBuffer.AsSpan(chunkSize, remainingBytes).CopyTo(readBuffer.AsSpan());
}

if (degreeOfParallelism == 0)
{
    if (readBuffer != null) pool.Return(readBuffer);
    if (keyBuffer != null) pool.Return(keyBuffer);
}

Если параметр degreeOfParallelism равен нулю, то код будет выполнятся последовательно, как и раньше. Если degreeOfParallelism >= 1, то после чтения чанка он отправится в readToParse канал и основной поток сразу же начнет читать второй чанк.

Очевидно в таком случае одним буфером для строк и ключей обойтись не получится, буферы придется каждый раз выделять новые. Чтобы не забить всю память таким образом я сразу применил ArrayPool. Ничего сложного нет: вместо оператора new вызываем метод Rent, а когда перестали пользоваться - вызываем Return.

ParallelParser, ParallelSorter и ParallelWriter выглядят так:

private async Task ParallelParser()
{
    var charBuffer = new char[1024];
    await foreach (var (readBuffer, chunkSize) in readToParse.Reader.ReadAllAsync())
    {
        var keyBuffer = pool!.Rent(maxChunkSize);
        var chunk = ParseChunk(chunkSize, readBuffer, keyBuffer, charBuffer).ToList();
        await parseToSort.Writer.WriteAsync((chunk, readBuffer, keyBuffer));

    }
}

private async Task ParallelSorter()
{
    await foreach (var item in parseToSort.Reader.ReadAllAsync())
    {
        item.Item1.Sort(comparer);
        await sortToCompress.Writer.WriteAsync(item);
    }
}

private async Task ParallelWriter()
{
    await foreach (var (buffer, bufferLength) in compressToWrite.Reader.ReadAllAsync())
    {
        var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}.tmp");
        using (var tempFile = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None, 0, FileOptions.SequentialScan))
        { 
            await tempFile.WriteAsync(buffer.AsMemory(0, bufferLength));
        }
        pool!.Return(buffer);
        tempFiles.Add(tempFileName);
    }
}

Они построены по простому принципу - читаем сообщения из канала пока они не кончатся, на каждое сообщение выполняем свое действие и отправляем дальше.

ParallelCompressor построен по тому же принципу, но содержит больше кода. Уберу его под спойлер.

Код ParallelCompressor
private async Task ParallelCompressor()
{
    var buffer = new byte[1024]; //Buffer with margin
    var outputSize = BrotliEncoder.GetMaxCompressedLength(maxChunkSize * 2);
    await foreach (var (chunk, readBuffer, keyBuffer) in sortToCompress.Reader.ReadAllAsync())
    {
        using var encoder = new BrotliEncoder(4, 22);
        var output = pool!.Rent(outputSize);
        var dest = output.AsMemory();

        var compressed = 0;
        foreach (var sk in chunk)
        {
            if (sk.Length > buffer.Length)
            {
                buffer = new byte[sk.Length];
            }

            sk.Write(buffer, 0);

            var source = buffer.AsMemory(0, sk.Length);
            while (true)
            {
                var r = encoder.Compress(source.Span, dest.Span, out var bytesConsumed, out var bytesWritten, false);
                compressed += bytesWritten;
                if (bytesConsumed > 0) source = source[bytesConsumed..];
                if (bytesWritten > 0) dest = dest[bytesWritten..];
                if (r == OperationStatus.Done) break;
                if (r == OperationStatus.InvalidData || r == OperationStatus.NeedMoreData)
                {
                    throw new InvalidOperationException();
                }
                var old = output;
                outputSize *= 2;
                output = pool.Rent(outputSize);

                old.CopyTo(output, 0);
                pool.Return(old);
                dest = output.AsMemory(compressed);

            }
        }

        while (true)
        {
            var r = encoder.Flush(dest.Span, out var bytesWritten);
            compressed += bytesWritten;
            if (r == OperationStatus.Done) break;
            if (r == OperationStatus.InvalidData || r == OperationStatus.NeedMoreData)
            {
                throw new InvalidOperationException();
            }
            var old = output;
            outputSize *= 2;
            output = pool.Rent(outputSize);

            old.CopyTo(output, 0);
            pool.Return(old);
            dest = output.AsMemory(compressed);
        }
        outputSize = compressed * 11 / 10;
        await compressToWrite.Writer.WriteAsync((output, compressed));

        pool.Return(readBuffer);
        pool.Return(keyBuffer);
    }
}

Из отсортированного списка строк и ключей записываем все в энкодер, а он периодически отдает нам блок упакованных данных. В конце надо вызвать Flush. Все осложняется тем, что метод может выполниться частично и сказать, что для продолжения недостаточно места в целевом буфере. Тогда надо выделить буфер побольше и перенести туда данные из старого.

В конце код завершения параллельной обработки: завершаем очереди и ждем завершения потоков.

readToParse.Writer.Complete();
await parserThread;
parseToSort.Writer.Complete();
await sorterThread;
sortToCompress.Writer.Complete();
await compressThread;
compressToWrite.Writer.Complete();
await writerThread;

Запускаем с размером чанка в 200 мегабайт.

SplitSort done in 00:02:21.4203828
Merge done in 00:00:39.0610435

Три минуты в сумме, есть шанс уложиться в час для 100Гб.

Посмотрим в таск менеджер:

Сортировка слиянием — не так просто, как кажется - 3

Потребление памяти выросло с 400Мб до 5,3Гб, это уже много. Почему так?

Когда код выполнялся последовательно для всех операцию использовался один набор буферов - для чтения данных, для ключей, список для сортировки и буфер для временного файла. Когда мы перешли в параллельный вариант у нас таких наборов как минимум количеству потоков + количеству каналов и свободных мест в них.

Такова, к сожалению, цена параллельности. Очень редко можно распараллелить обработку данных, не повышая размер используемой оперативной памяти.

Нагрузка на диск получилась небольшая, стоит добавить еще потоков для парсинга, сортировки и сжатия данных, то есть увеличить степень параллелизма (dop). Но это увеличит затраты памяти. Можно уменьшать размер чанка при повышении степени параллелизма.

// Значения по умолчанию
dop = Environment.ProcessorCount / 4;
chunkSize = 200 / int.Max(dop, 1);

Финальный прогон с дефолтными параметрами (dop=4, chunkSize=50)

SplitSort done in 00:00:53.8610345
Merge done in 00:00:39.7727140

Итого 1:40 (не более 1:50 за несколько прогонов).

Код со всеми бенчмарками по ссылке.

Заключение

Я очень сильно ошибся, думая что задача сортировки 100Гб файла простая. Для её решения нужно много знаний алгоритмов, библиотек, навык оптимизации программ и написания параллельного кода. А самое главное эта задача хорошо показывает способен ли программист преодолевать технические трудности и решать задачу до конца, а не пытаться найти короткий путь и опустить руки, если такого пути нет.

PS

❯ .Sort.exe ........100gb.txt
SplitSort done in 00:11:35.9023876
Merge done in 00:20:16.3989011

Автор: Стас Выщепан

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js