Сортировка слиянием — не так просто, как кажется

от автора

В одной конторе соискателю на позицию Senior C# developer выдали тестовое задание: отсортировать файл со строками определенного формата.

Требования такие:

  • Формат строки: число, точка, пробел, далее любые символы до конца строки

  • Порядок сортировки — сначала сортируем текстовой части строки, потом по числу если текстовые части совпадают

  • Кодировка — UTF-8

  • Размер файла — 100гб — гарантированно больше объема ОП

  • Должно отработать за 1 час на машине проверяющего, вряд ли там будет супер-быстрый SSD и огромное количество оперативной памяти

Как и многие другие программисты, узнав о таком тестовом задании, я возмутился. Внешнюю сортировку слиянием практически всех проходили в ВУЗе, но практически никто никогда не писал её. Задача очень непрактическая и непонятно какие навыки проверяет. Так мне казалось.

Эта задача вызвала бурные обсуждения о способах её решения. Многие программисты, причисляющие себя к рангу senior, предложили использовать базы данных, ибо не барское это дело — вручную писать алгоритмы сортировки. Некоторые даже попытались сделать решение на Apache Spark. Однако никто до конца задачу не решил, ибо мало кому удалось отсортировать в нужном порядке даже 10ГБ файл менее чем за 15 минут без SSD.

Я подумал, что стоит решить задачу до конца с помощью программирования, и тоже причислить себя к рангу senior developer.

В первую очередь я написал генератор тестового файла, который генерирует нужное количество строк из исходного файла. В качестве исходного взял первый том Войны и Мира, так как там есть как русские, так и английские символы.

Код генератора
var source = (from l in File.ReadLines("source.txt")               where !string.IsNullOrEmpty(l)               from s in l.Split(new[] { '.', '?', '!', '[', ']' }, StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries)               where s.Length > 10               select s).ToList();  Random rand = new();  using (var f = File.CreateText(file)) {     f.AutoFlush = false;     while(f.BaseStream.Position < maxSize)     {         var n = rand.Next();         f.Write(n);         f.Write(". ");         f.WriteLine(source[rand.Next(source.Count)]);     } } return 0;

Для начала решил сгенерировать 10ГБ, чтобы не ждать час на каждом тестовом прогоне. Кроме того файл такого размера не помещается в кэши операционной системы и операции чтения-записи доходят до диска, что дает представление о реальном быстродействии на больших объемах.

Самое простое работающее решение

Все началось со статьи на хабре о внешней сортировке. Сразу отбросил идею нескольких прогонов для объединения блоков, так как это привело бы к дополнительным затратам на запись. Весь код разделил на две фазы — разбиение исходного файла на отдельные блоки (чанки, от английского chunk) и сортировка строк в блоках, слияние блоков в один файл.

Код разбиения

var count = 0; var tempFiles =     File.ReadLines(file)         .Select(s => new Item(s, s.IndexOf('.')))         .Chunk(chunkSize)         .Select(chunk =>         {             Array.Sort(chunk, comparer);             var tempFileName = Path.ChangeExtension(file, $".part-{count++}" + Path.GetExtension(file));             File.WriteAllLines(tempFileName, chunk.Select(x => x.Line));             return tempFileName;         }).ToList();

Код слияния

try {     var mergedLines = tempFiles         .Select(f => File.ReadLines(f).Select(s => new Item(s, s.IndexOf('.'))))         .Merge(comparer) // IEnumerable<IEnumerable<T>> -> IEnumerable<T>         .Select(x => x.Line);     File.WriteAllLines(Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file)), mergedLines); } finally {     tempFiles.ForEach(File.Delete); }

Для того, чтобы удобнее писать код, определил тип, содержащий строку и позицию точки в строке и компаратор для этого типа

public record struct Item(string Line, int DotPosition); public record Comparer(StringComparison StringComparison) : IComparer<Item> {     public int Compare(Item x, Item y)     {         var spanX = x.Line.AsSpan();         var spanY = y.Line.AsSpan();         var xDot = x.DotPosition;         var yDot = y.DotPosition;          var cmp = spanX[(xDot + 2)..].CompareTo(spanY[(yDot + 2)..], StringComparison);         if (cmp != 0) return cmp;         return int.Parse(spanX[..xDot]) - int.Parse(spanY[..yDot]);     } }

«Сердце» всего алгоритма внешней сортировки — слияние итераторов

public static IEnumerable<T> Merge<T>(this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer = default) {     var enumerators = (from source in sources                         let e = source.GetEnumerator()                         where e.MoveNext()                         select e).ToList();                  while (enumerators.Count > 0)     {         var min = enumerators.MinBy(e => e.Current, comparer)!;         yield return min.Current;         if (!min.MoveNext())         {             min.Dispose();             enumerators.Remove(min);         }     } }

Почему я не использовал async\await? Ведь сейчас все программисты C# втыкают async\await на автомате. Конечно я тоже так сделал сначала, но потом убрал.

Во-первых для асинхронных итераторов сложнее написать Merge. Во-вторых код с async\await медленнее работал. async\await несет дополнительные расходы на переключение контекста, продолжения вызывают всю цепочку асинхронных методов. Это может быть выгодно когда нам надо распараллелить ожидание, но в этом коде никаких параллельных ожиданий нет. Все операции происходят последовательно.

Первый запуск

Запустил сортировку слиянием, размер чанка — 1М строк или около 157Мб, время работы — 15:30, пятнадцать с половиной минут! В час для 100Гб уложиться не выйдет.

Что по вашему тормозило в этом коде больше всего? Напишите свой вариант в комментариях, прежде чем разворачивать спойлер и читать дальше.

Тайминг
SplitSort done in 00:04:59.2942000 Merge done in 00:10:32.1238153

Диспетчер задач показывал, что во время сортировки ресурсы компьютера задействуются очень мало

Нагрузка на процессор в фазе разбиения (ЦП7 выполнял код)
Нагрузка на процессор в фазе разбиения (ЦП7 выполнял код)
Нагрузка на диск в фазе разбиения
Нагрузка на диск в фазе разбиения

Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/naive

Оптимизируем слияние

Дольше всего выполняется не чтение или запись, а поиск минимального элемента во время слияния. Этот код я честно написал сам, не подсматривая в готовые решения. Гораздо эффективнее будет отсортировать итераторы один раз, а далее поддерживать их отсортированность после вызова .MoveNext(), даже на StackOverflow предлагают такой вариант.

Лучше всего подойдет двоичная (она же бинарная) куча. Она имеет минимальный элемент в корне и позволяет восстановить отсортированность за O(logN), где K — количество элементов в куче (у нас равно числу чанков). Естественно это я не сам придумал, а подсмотрел в интернете.

Методы работы с кучей
public static void Heapify<T>(this Span<T> heap, int index, IComparer<T> comparer) {     ArgumentNullException.ThrowIfNull(comparer);      var min = index;     while (true)     {         var leftChild = 2 * index + 1;         var rightChild = 2 * index + 2;         var v = heap[index];          if (rightChild < heap.Length && comparer.Compare(v, heap[rightChild]) > 0)         {             min = rightChild;             v = heap[min];         }          if (leftChild < heap.Length && comparer.Compare(v, heap[leftChild]) > 0)         {             min = leftChild;         }          if (min == index) break;          var temp = heap[index];         heap[index] = heap[min];         heap[min] = temp;          index = min;     } }  public static void BuildHeap<T>(this Span<T> heap, IComparer<T> comparer) {     ArgumentNullException.ThrowIfNull(comparer);      for (int i = heap.Length / 2; i >= 0; i--)     {         Heapify(heap, i, comparer);     } }

Код метода слияния

public static IEnumerable<T> Merge<T>(this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer = default) {     var heap = (from source in sources                 let e = source.GetEnumerator()                 where e.MoveNext()                 select e).ToArray();      var enumeratorComparer = new EnumeratorComparer<T>(comparer ?? Comparer<T>.Default);     heap.AsSpan().BuildHeap(enumeratorComparer);      while (true)     {         var min = heap[0];         yield return min.Current;         if (!min.MoveNext())         {             min.Dispose();             if (heap.Length == 1) yield break;             heap[0] = heap[^1];             Array.Resize(ref heap, heap.Length - 1);         }         heap.AsSpan().Heapify(0, enumeratorComparer);     } }  private record EnumeratorComparer<T>(IComparer<T> comparer) : IComparer<IEnumerator<T>> {     public int Compare(IEnumerator<T>? x, IEnumerator<T>? y)     {         return comparer.Compare(x!.Current, y!.Current);     } }

Остальной код программы не изменился. Время работы:

SplitSort done in 00:04:27.8391844 Merge done in 00:02:11.4364005

Значительно лучше, но до заветного часа на 100ГБ еще очень далеко. Тут стоит обратить внимание, что из-за кэша файловой системы время работы может варьироваться +\-15%

Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/heapsort

Оптимизируем разбиение

Фазы разбиения и слияния выполняют одинаковое количество чтения-записи, создают одинаковое количество объектов типа string, но фаза разбиения использует в 2,5 раз больше памяти и запуск под отладчиком показывает множество сборок мусора.

Все дело во времени жизни объектов. В фазе слияния объект строки живет от чтения из чанка до записи в результирующий файл. Когда считывается следующая строка из чанка предыдущая уже превратилась мусор. Мусор убирается в нулевом поколении сборщика, это происходит быстро и память не растет.

В фазе разбиения объекты строк живут от чтения из исходного файла до записи в чанк. Большинство объектов строк переживает несколько сборок мусора, что создает повышенную активность сборщика и увеличивает потребляемую память.

Мы не можем уменьшить время жизни строк на фазе разбиения. Но их можно вообще не создавать! Можно прочитать из файла блок символов, разделить по символу перевода строки и использовать вместо строк тип ReadOnlyMemory<char>, который предоставляет ту же функциональность. ReadOnlyMemory<char> это структура (не требует аллокаций в управляемой куче), которая представляет из себя ссылку на массив, смещение и длину.

Код разбиения без аллокаций

List<string> tempFiles = new(); List<Item> chunk = new(); using (var reader = File.OpenText(file)) {     var chunkBuffer = new char[chunkSize];     var chunkReadPosition = 0;     var eos = reader.EndOfStream;     while (!eos)     {         // Читаем из файла весь буфер         var charsRead = reader.ReadBlock(chunkBuffer.AsSpan(chunkReadPosition));         eos = reader.EndOfStream;         var m = chunkBuffer.AsMemory(0, chunkReadPosition + charsRead);          // Заполняем список строк ReadOnlyMemory<char> для сортировки         int linePos;         while ((linePos = m.Span.IndexOf(Environment.NewLine)) >= 0 || (eos && m.Length > 0))         {             var line = linePos >= 0 ? m[..linePos] : m;             chunk.Add(new Item(line, line.Span.IndexOf('.')));             m = m[(linePos + Environment.NewLine.Length)..];         }          chunk.Sort(comparer);          // Записываем строки из отсортированного списка во временный файл         var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}" + Path.GetExtension(file));         using (var tempFile = File.CreateText(tempFileName))         {             foreach (var (l, _) in chunk)             {                 tempFile.WriteLine(l);             }         }         tempFiles.Add(tempFileName);          if (eos) break;         chunk.Clear();          //Отсток буфера переносим в начало         m.CopyTo(chunkBuffer);         chunkReadPosition = m.Length;     } }

Можно было бы оставить код в функциональном стиле, но тогда код получился бы более неуклюжим из-за необходимости передачи флага конца файла.

В структурах данных заменил string наReadOnlyMemory<char>и больше ничего не изменилось.

Время работы при размере чанка в 100М символов, 161Мб на диске:

SplitSort done in 00:03:50.6780519 Merge done in 00:02:19.5627238

Удалось выиграть еще 30 сек и сократить расход памяти на фазе разбиения со 600 до 250 мегабайт. Как говорится Allocation is cheap… until it is not (https://tooslowexception.com/allocation-is-cheap-until-it-is-not/ статья от другом, но заголовок подходит).

Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/reduced-allocations

К сожалению на этом все простые оптимизации кончились, а суммарное время работы все еще не позволит уложиться в час.

Как сравнивать строки

Для многих программистов сравнение строк это все еще посимвольное, а для тех кто пришел из С — побайтное сравнение. Но примерно с 2000 года все используют юникод. Юникод это не просто два байта на символ и кодировки переменной длины, вроде UTF8, это еще правила сравнения, нормализации и подсчета символов. Кто еще не в курсе — посмотрите доклад Plain Text Дилана Битти на NDC. Это один из лучших докладов за всю историю конференций.

Сравнение юникодных строк описано в стандарте Unicode Collation Algorithm (UCA). Это очень сложный алгоритм, который опирается на таблицы весов символом для разных культур. Этот алгоритм реализован в операционной системе (CompareStringW, CompareStringEx в Windows и CompareString из libSystem.Globalization.Native.so в Linux).

Конечно можно от этого всего отказаться и сравнивать строки посимвольно, это ускорит сортировку почти на минуту, так как .NET не использует системные API для этого. Достаточно указать StringComparison.Ordinal в Comparer. Кроме того, отказ от UCA позволяет использовать поразрядные (radix) алгоритмы сортировки, которые должны работать быстрее обычных. Но изменит порядок сортировки и фактически является оптимизацией под один частный случай. Не будет простых способов вернуться к UCA без потери быстродействия.

Один из шагов UCA — получение ключа сортировки (sort key) для строк — простого массива байт, который можно использовать для побайтного сравнения. Оказывается в .NET есть функция получения ключа сортировки строк CompareInfo.GetSortKey. То есть мы можем получить эти байты и потом сравнивать их. Если дописать в конец полученного массива байты числа, стоящего в начале, то мы можем всю сортировку свести к сортировке байтовых массивов.

Скоро 15 лет как я программирую на .NET и я узнал о наличии ключей сортировки строк и соответствующих классов только когда решал эту задачу.

Пытаемся оптимизировать сортировку

Для начала добавим получение ключей и сортировку по ним в методы разбиения и слияния

List<string> tempFiles = new(); List<Item> chunk = new(); using (var reader = File.OpenText(file)) {     var keyBuffer = new byte[chunkSize * 2]; //Буфер для ключей     var chunkBuffer = new char[chunkSize];     var chunkReadPosition = 0;     var eos = reader.EndOfStream;     while (!eos)     {         // Читаем из файла весь буфер         var charsRead = reader.ReadBlock(chunkBuffer.AsSpan(chunkReadPosition));         eos = reader.EndOfStream;         var m = chunkBuffer.AsMemory(0, chunkReadPosition + charsRead);         var key = keyBuffer.AsMemory();          // Заполняем список строк ReadOnlyMemory<char> для сортировки         int linePos;         while ((linePos = m.Span.IndexOf(Environment.NewLine)) >= 0 || (eos && m.Length > 0))         {             var line = linePos >= 0 ? m[..linePos] : m;             var s = line.Span;             var dot = line.Span.IndexOf('.');             int x = int.Parse(s[..dot]);             s = s[(dot + 2)..];             var keyLen = culture.CompareInfo.GetSortKey(s, key.Span);    // Получаем ключ             BinaryPrimitives.WriteInt32BigEndian(key[keyLen..].Span, x); // Добписываем число в конец ключа, чтобы старшый байт был с меньшим индексом             keyLen += sizeof(int);              chunk.Add(new Item(line, key[..keyLen]));             m = m[(linePos + Environment.NewLine.Length)..];             key = key[keyLen..];         }          chunk.Sort(comparer);          // Записываем строки из отсортированного списка во временный файл         var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}" + Path.GetExtension(file));         using (var tempFile = File.CreateText(tempFileName))         {             foreach (var (l, _) in chunk)             {                 tempFile.WriteLine(l);             }         }         tempFiles.Add(tempFileName);          if (eos) break;         chunk.Clear();          //Остаток буфера переносим в начало         m.CopyTo(chunkBuffer);         chunkReadPosition = m.Length;     } }

При слиянии нам также надо получать ключи

try {     var mergedLines = tempFiles         .Select(f => File.ReadLines(f).Select(s => // Читаем построчно все файлы          {             var m = s.AsMemory();             var dot = s.IndexOf('.');              // Находим в строках точку             int x = int.Parse(s.AsSpan(0, dot));              // Получаем ключ того, что находится после точки с пробелом             var key = new byte[s.Length * 2 + sizeof(int)];             var keyLen = culture.CompareInfo.GetSortKey(m[(dot + 2)..].Span, key);                           // Дописываем число в конец             BinaryPrimitives.WriteInt32BigEndian(key.AsSpan(keyLen), x);                      return new Item(m, key);         }))         .Merge(comparer);  //Слияние итераторов IEnumerable<IEnumerable<T>> в IEnumerable<T>        using var sortedFile = File.CreateText(Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file)));     foreach (var (l, _) in mergedLines)     {         sortedFile.WriteLine(l);     } } finally {     tempFiles.ForEach(File.Delete); }

Компаратор теперь очень простой

public record struct Item(ReadOnlyMemory<char> Line, ReadOnlyMemory<byte> Key); public class Comparer : IComparer<Item> {     public int Compare(Item x, Item y)     {         return x.Key.Span.SequenceCompareTo(y.Key.Span);     } }

Результаты ожидаемо хуже

SplitSort done in 00:04:09.5091207 Merge done in 00:03:02.5646277

Мы проиграли 40 секунд на слиянии из-за получения ключей и 10 секунд на разбиении и сортировке. Сортировка ключей оказалась эффективнее, чем сортировка строк, но накладные расходы на получение ключей убили весь выигрыш.

Зато теперь можно применить поразрядную (Radix) сортировку ключей. Я написал два варианта поразрядной сортировки — Radix Quick Sort aka Multi-key QuickSort (просто перевел на C# алгоритм описанный в статье) и Counting Radix Sort (в основном скопировал код отсюда). К сожалению оба варианта проиграли стандартному Array.Sort(Код этих сортировок в статье не привожу, чтобы не забивать объем, но вы сможете найти его в исходниках вместе с бенчмарками по ссылке в конце статьи). Скорее всего потому, что сравнение блоков памяти методом SequenceCompareTo оптимизируется с помощью SIMD и работает гораздо быстрее, чем ручной код сравнения по разрядам.

Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/sort-key

На этом месте я устал и лег спать.

А что если сохранять ключи?

С этой мыслью я проснулся на следующий день.

  • Во-первых сохраняя ключи во временном файле мы можем не получать ключ сортировки через API в фазе слияния.

  • Во-вторых нам вообще даже не надо декодировать символы в фазе слияния, мы можем просто сохранять нужное количество байт в выходном файле.

  • В-третьих, спустившись на уровень файловых потоков (FileStream вместо StreamReader) мы сможем эффективнее управлять буферизацией.

Я сделал бенчмарк, где сравнил все способы построчного чтения файлов, где сравнил File.ReadLines, StreamReader, FileStream и различные варианты буферизации, а также модный молодежный PipeReader. Победил, ожидаемо, FileStream, как самый низкоуровневый инструмент. Кроме того если вы будете читать или записывать данные большими блоками, то выгодно отключать встроенную буферизацию .NET, а если маленькими, то указывать большой размер буфера (код бенчмарков по ссылке в конце статьи).

Много кода

Фаза разбиения

public void SplitSort() {     using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 0, FileOptions.SequentialScan);     fileSize = stream.Length;      List<SortKey> chunk = new();      var keyBuffer = new byte[maxChunkSize];     var readBuffer = new byte[maxChunkSize];     var remainingBytes = 0;      var charBuffer = new char[1024];     var eof = false;     while (!eof)     {         var bytesRead = stream.ReadBlock(readBuffer, remainingBytes, maxChunkSize - remainingBytes, out eof);         int chunkSize = remainingBytes + bytesRead;         if (!eof)         {             var lastNewLine = readBuffer.AsSpan(0, bytesRead).LastIndexOf(NewLine);             if (lastNewLine >= 0) chunkSize = lastNewLine + NewLine.Length;             remainingBytes = remainingBytes + bytesRead - chunkSize;         }          chunk.AddRange(ParseChunk(chunkSize, readBuffer, keyBuffer, charBuffer));          //Сортируем и записываем чанки на диск         chunk.Sort(comparer);         WriteChunk(chunk);          chunk.Clear();         //Остаток буфера переносим в начало         if (remainingBytes > 0) readBuffer.AsSpan(chunkSize, remainingBytes).CopyTo(readBuffer.AsSpan());     } }

Функция чтения строк и получения ключей сортировки

private IEnumerable<SortKey> ParseChunk(int byteCount, byte[] readBuffer, byte[] keyBuffer, char[] charBuffer) {     var readPos = 0;     var key = keyBuffer.AsMemory();     while (byteCount > 0)     {         var linePos = readBuffer.AsSpan(readPos, byteCount).IndexOf(NewLine);         if (linePos == -1) linePos = byteCount;         if (charBuffer.Length < linePos) charBuffer = new char[linePos];          // Надо обязательно вызывать именно эту перегрузку, потому что остальные аллоцируют память         var lineLen = encoding.GetChars(readBuffer, readPos, linePos, charBuffer, 0);         var line = charBuffer.AsMemory(0, lineLen);         var s = line.Span;         var dot = s.IndexOf('.');         var x = int.Parse(s[0..dot]);          var keyLen = culture.CompareInfo.GetSortKey(s[(dot + 2)..], key.Span, compareOptions);         BinaryPrimitives.WriteInt32BigEndian(key[keyLen..].Span, x);         keyLen += sizeof(int);          var lineSize = linePos + NewLine.Length;         yield return new SortKey(readBuffer.AsMemory(readPos, lineSize), key[..keyLen]);         key = key[keyLen..];          readPos += lineSize;         byteCount -= lineSize;         maxLineSize = Math.Max(maxLineSize, lineSize);         maxKeyLength = Math.Max(maxKeyLength, keyLen);     } }

Функция записи чанка на диск

void WriteChunk(List<SortKey> chunk) {     // Записываем строки из отсортированного списка во временный файл     var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}.tmp");     using var stream = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None, BufferSize, FileOptions.SequentialScan);              Span<byte> buffer = stackalloc byte[sizeof(int)];     foreach (var (line, key) in chunk)     {         BinaryPrimitives.WriteInt32LittleEndian(buffer, line.Length);         stream.Write(buffer);         stream.Write(line.Span);          BinaryPrimitives.WriteInt32LittleEndian(buffer, key.Length);         stream.Write(buffer);         stream.Write(key.Span);     }     tempFiles.Add(tempFileName); }

Фаза слияния

public void Merge() {     var mergedLines = tempFiles         .Select(ReadTempFile) // Читаем построчно все файлы, находим в строках точку         .Merge(comparer);  //Слияние итераторов IEnumerable<IEnumerable<T>> в IEnumerable<T>      string sortedFileName = Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file));     using var sortedFile = new FileStream(sortedFileName, FileMode.Create, FileAccess.Write, FileShare.None, BufferSize, FileOptions.SequentialScan);     sortedFile.SetLength(fileSize);     foreach (var (l, _) in mergedLines)     {         sortedFile.Write(l.Span);     } }

Чтение временного файла

private IEnumerable<SortKey> ReadTempFile(string file) {     using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, BufferSize, FileOptions.SequentialScan);      var maxBlockSize = maxLineSize + maxKeyLength + sizeof(int) * 2;     var readBuffer = new byte[Math.Max(BufferSize, maxBlockSize)];      var bytesRemaining = 0;     var eof = false;      while (!eof)     {         var bytesRead = stream.ReadBlock(readBuffer, bytesRemaining, readBuffer.Length - bytesRemaining, out eof);         if (bytesRead == 0) eof = true;         var mem = readBuffer.AsMemory(0, bytesRemaining + bytesRead);          while (mem.Length > maxBlockSize || (eof && mem.Length > 0))         {              var lineSize = BinaryPrimitives.ReadInt32LittleEndian(mem.Span);             mem = mem[sizeof(int)..];              var line = mem[..lineSize];             mem = mem[lineSize..];              var keyLen = BinaryPrimitives.ReadInt32LittleEndian(mem.Span);             mem = mem[sizeof(int)..];              yield return new SortKey(line, mem[..keyLen]);             mem = mem[keyLen..];         }          mem.CopyTo(readBuffer);          bytesRemaining = mem.Length;     } }

Из 25 строк кода в самом начале, написанных даже без классов и метода Main, всё превратилось в 150 строк без учета конструктора и полей класса.

Результаты забега при установке размера чанка в 100М байт. Так как теперь вместе со строками записываются ключи размер одного временного файла на диске составляет 180МБ.

SplitSort done in 00:04:12.8286312 Merge done in 00:03:05.3477665

Результат приблизительно равен предыдущему, но это при учете что теперь мы пишем и читаем не 10Гб временных файлов, в 18гб. В таск менеджере заметно, что быстродействие теперь сильно упирается в диск.

Если быстродействие сильно упирается в диск, то нужно данные сжать. Так мне говорила бабушка прочитал в книге по базам данных. Завернем FileStream в BrotliStream при записи и чтении временных файлов. Brotli — это новый алгоритм сжатия, который пока еще приходит в веб и другие аспекты разработки. Подробнее можно прочитать на википедии.

Результаты забега со сжатием

SplitSort done in 00:04:28.3044728 Merge done in 00:00:36.4300613

В сумме меньше 5 минут. Суммарный объем временных файлов на диске сократился до 970МБ, то есть почти в 20 раз. Это понятно, так как в файлах очень много повторяющихся строк. Возможно на других текстовых файлах результат будет не настолько выдающимся, но все равно написанные человеком или chatGpt тексты будут хороши сжиматься.

Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/sort-key-with-compression

Быстродействие теперь упирается не в диск, а в процессор. И это хорошо. Диск у нас один, а процессоров зачастую больше.

Распараллеливание

Сейчас программа выполняется последовательно:

  1. Чтение чанка (нагружает диск и не использует процессор)

  2. Парсинг строк и получение ключей (нагружает процессор в основном)

  3. Сортировка (сильно нагружает процессор)

  4. Сжатие данных (сильно нагружает процессор)

  5. Запись (сильно нагружает диск)

Было бы неплохо пункты 1 и 5 выполнять параллельно с 2-4.

Заведем пять отдельных потоков для каждой задачи. Для передачи чанков между потоками воспользуемся библиотекой System.Threading.Channels.

readToParse = Channel.CreateBounded<(byte[], int)>(1); // Буфер и размер parseToSort = Channel.CreateBounded<(List<SortKey>, byte[], byte[])>(1);     // Список ключей, буфер строк и буфер ключей sortToCompress = Channel.CreateBounded<(List<SortKey>, byte[], byte[])>(1)); // Список ключей, буфер строк и буфер ключей compressToWrite = Channel.CreateBounded<(byte[], int)>(1); // Сжатые данные и размер  parserThreads =     Enumerable     .Range(0, degreeOfParallelism)     .Select(_ => Task.Run(ParallelParser)).ToArray();  sorterThreads =     Enumerable     .Range(0, degreeOfParallelism)     .Select(_ => Task.Run(ParallelSorter)).ToArray();  compressThreads =     Enumerable     .Range(0, degreeOfParallelism)     .Select(_ => Task.Run(ParallelCompressor)).ToArray();  writerThread = Task.Run(ParallelWriter);

Нам нужен ограниченный канал с емкостью в одно сообщение. Если сообщение уже есть в очереди, то есть получатели заняты обработкой предыдущего, то отправитель будет висеть в ожидании освобождения канала. Таким образом нагрузка будет автоматически балансироваться.

Метод SplitSort изменим так, чтобы он мог работать как в синхронном режиме, так и в параллельном

using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 0, FileOptions.SequentialScan); fileSize = stream.Length;  List<SortKey>? chunk = null; byte[]? keyBuffer = null; char[]? charBuffer = null;  var readBuffer = pool!.Rent(maxChunkSize); var remainingBytes = 0; var eof = false;   while (!eof) {     var bytesRead = stream.ReadBlock(readBuffer, remainingBytes, maxChunkSize - remainingBytes, out eof);     int chunkSize = remainingBytes + bytesRead;     if (!eof)     {         var lastNewLine = readBuffer.AsSpan(0, bytesRead).LastIndexOf(NewLine);         if (lastNewLine >= 0) chunkSize = lastNewLine + NewLine.Length;         remainingBytes = remainingBytes + bytesRead - chunkSize;     }      var oldBuffer = readBuffer;     if (degreeOfParallelism > 0)     {         await readToParse.Writer.WriteAsync((readBuffer, chunkSize));         readBuffer = pool.Rent(maxChunkSize);     }     else     {         chunk ??= new();          chunk.AddRange(ParseChunk(chunkSize, readBuffer,             keyBuffer ??= pool.Rent(maxChunkSize),             charBuffer ??= new char[1024]));          //Сортируем и записываем чанки на диск         chunk.Sort(comparer);         WriteChunk(chunk);         chunk.Clear();     }      //Осаток буфера переносим в начало     if (remainingBytes > 0) oldBuffer.AsSpan(chunkSize, remainingBytes).CopyTo(readBuffer.AsSpan()); }  if (degreeOfParallelism == 0) {     if (readBuffer != null) pool.Return(readBuffer);     if (keyBuffer != null) pool.Return(keyBuffer); }

Если параметр degreeOfParallelism равен нулю, то код будет выполнятся последовательно, как и раньше. Если degreeOfParallelism >= 1, то после чтения чанка он отправится в readToParse канал и основной поток сразу же начнет читать второй чанк.

Очевидно в таком случае одним буфером для строк и ключей обойтись не получится, буферы придется каждый раз выделять новые. Чтобы не забить всю память таким образом я сразу применил ArrayPool. Ничего сложного нет: вместо оператора new вызываем метод Rent, а когда перестали пользоваться — вызываем Return.

ParallelParser, ParallelSorter и ParallelWriter выглядят так:

private async Task ParallelParser() {     var charBuffer = new char[1024];     await foreach (var (readBuffer, chunkSize) in readToParse.Reader.ReadAllAsync())     {         var keyBuffer = pool!.Rent(maxChunkSize);         var chunk = ParseChunk(chunkSize, readBuffer, keyBuffer, charBuffer).ToList();         await parseToSort.Writer.WriteAsync((chunk, readBuffer, keyBuffer));      } }  private async Task ParallelSorter() {     await foreach (var item in parseToSort.Reader.ReadAllAsync())     {         item.Item1.Sort(comparer);         await sortToCompress.Writer.WriteAsync(item);     } }  private async Task ParallelWriter() {     await foreach (var (buffer, bufferLength) in compressToWrite.Reader.ReadAllAsync())     {         var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}.tmp");         using (var tempFile = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None, 0, FileOptions.SequentialScan))         {              await tempFile.WriteAsync(buffer.AsMemory(0, bufferLength));         }         pool!.Return(buffer);         tempFiles.Add(tempFileName);     } }

Они построены по простому принципу — читаем сообщения из канала пока они не кончатся, на каждое сообщение выполняем свое действие и отправляем дальше.

ParallelCompressor построен по тому же принципу, но содержит больше кода. Уберу его под спойлер.

Код ParallelCompressor
private async Task ParallelCompressor() {     var buffer = new byte[1024]; //Buffer with margin     var outputSize = BrotliEncoder.GetMaxCompressedLength(maxChunkSize * 2);     await foreach (var (chunk, readBuffer, keyBuffer) in sortToCompress.Reader.ReadAllAsync())     {         using var encoder = new BrotliEncoder(4, 22);         var output = pool!.Rent(outputSize);         var dest = output.AsMemory();          var compressed = 0;         foreach (var sk in chunk)         {             if (sk.Length > buffer.Length)             {                 buffer = new byte[sk.Length];             }              sk.Write(buffer, 0);              var source = buffer.AsMemory(0, sk.Length);             while (true)             {                 var r = encoder.Compress(source.Span, dest.Span, out var bytesConsumed, out var bytesWritten, false);                 compressed += bytesWritten;                 if (bytesConsumed > 0) source = source[bytesConsumed..];                 if (bytesWritten > 0) dest = dest[bytesWritten..];                 if (r == OperationStatus.Done) break;                 if (r == OperationStatus.InvalidData || r == OperationStatus.NeedMoreData)                 {                     throw new InvalidOperationException();                 }                 var old = output;                 outputSize *= 2;                 output = pool.Rent(outputSize);                  old.CopyTo(output, 0);                 pool.Return(old);                 dest = output.AsMemory(compressed);              }         }          while (true)         {             var r = encoder.Flush(dest.Span, out var bytesWritten);             compressed += bytesWritten;             if (r == OperationStatus.Done) break;             if (r == OperationStatus.InvalidData || r == OperationStatus.NeedMoreData)             {                 throw new InvalidOperationException();             }             var old = output;             outputSize *= 2;             output = pool.Rent(outputSize);              old.CopyTo(output, 0);             pool.Return(old);             dest = output.AsMemory(compressed);         }         outputSize = compressed * 11 / 10;         await compressToWrite.Writer.WriteAsync((output, compressed));          pool.Return(readBuffer);         pool.Return(keyBuffer);     } }

Из отсортированного списка строк и ключей записываем все в энкодер, а он периодически отдает нам блок упакованных данных. В конце надо вызвать Flush. Все осложняется тем, что метод может выполниться частично и сказать, что для продолжения недостаточно места в целевом буфере. Тогда надо выделить буфер побольше и перенести туда данные из старого.

В конце код завершения параллельной обработки: завершаем очереди и ждем завершения потоков.

readToParse.Writer.Complete(); await parserThread; parseToSort.Writer.Complete(); await sorterThread; sortToCompress.Writer.Complete(); await compressThread; compressToWrite.Writer.Complete(); await writerThread;

Запускаем с размером чанка в 200 мегабайт.

SplitSort done in 00:02:21.4203828 Merge done in 00:00:39.0610435

Три минуты в сумме, есть шанс уложиться в час для 100Гб.

Посмотрим в таск менеджер

Потребление памяти выросло с 400Мб до 5,3Гб, это уже много. Почему так?

Когда код выполнялся последовательно для всех операцию использовался один набор буферов — для чтения данных, для ключей, список для сортировки и буфер для временного файла. Когда мы перешли в параллельный вариант у нас таких наборов как минимум количеству потоков + количеству каналов и свободных мест в них.

Такова, к сожалению, цена параллельности. Очень редко можно распараллелить обработку данных, не повышая размер используемой оперативной памяти.

Нагрузка на диск получилась небольшая, стоит добавить еще потоков для парсинга, сортировки и сжатия данных, то есть увеличить степень параллелизма (dop). Но это увеличит затраты памяти. Можно уменьшать размер чанка при повышении степени параллелизма.

// Значения по умолчанию dop = Environment.ProcessorCount / 4; chunkSize = 200 / int.Max(dop, 1);

Финальный прогон с дефолтными параметрами (dop=4, chunkSize=50)

SplitSort done in 00:00:53.8610345 Merge done in 00:00:39.7727140

Итого 1:40 (не более 1:50 за несколько прогонов).

Код со всеми бенчмарками по ссылке https://github.com/gandjustas/HugeFileSort

Заключение

Я очень сильно ошибся, думая что задача сортировки 100Гб файла простая. Для её решения нужно много знаний алгоритмов, библиотек, навык оптимизации программ и написания параллельного кода. А самое главное эта задача хорошо показывает способен ли программист преодолевать технические трудности и решать задачу до конца, а не пытаться найти короткий пусть и опустить руки, если такого пути нет.

PS

❯ .\Sort.exe ..\..\..\..\100gb.txt SplitSort done in 00:11:35.9023876 Merge done in 00:20:16.3989011


ссылка на оригинал статьи https://habr.com/ru/post/714524/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *