Использование параллелизма в упорядоченных потоках данных

Exploiting Data Parallelism in Ordered Data Streams [Eng., PDF 209KB]

Аннотация

Рассмотрим приложения с большим количеством вычислений, реализующие сложные преобразования упорядоченных входных данных в упорядоченные выходные данные, например, кодирование звука и видео, сжатие без потерь, обработка сейсмических данных. Хотя алгоритмы, используемые в этих преобразованиях, в большинстве своем параллельны, управление входным и выходным порядком данных может быть непростой задачей. В данной статье рассматриваются некоторые из подобных задач и описываются стратегии их решения при сохранении производительности параллельного выполнения.

Введение

Рассмотрим задачу распараллеливания сжатия видео в реальном времени. Данные поступают из некоторого источника и должны направляться на диск или по сети. Очевидно, что использование мощности многоядерных процессоров является основным условием для соблюдения требований к работе таких приложений в реальном времени.

Стандарты сжатия видео, такие как MPEG2 и MPEG4, разработаны для обработки потоков данных, передающихся по ненадежным каналам. Далее, можно рассматривать единый видеопоток как последовательность малых самостоятельных потоков. Обрабатывая эти отдельные малые потоки параллельно, можно получить существенное ускорение процесса обработки. Одной из трудностей при применении параллелизма через многопоточность является следующее:

  • Определение не накладывающихся друг на друга наборов задач и назначение их потокам
  • Обеспечение гарантии, что входные данные читаются лишь однажды и в правильном порядке
  • Вывод блоков итоговых данных в правильном порядке независимо от того, в каком порядке они были обработаны, без ощутимой потери производительности
  • Выполнить все вышеперечисленное, не зная заранее реального объема входных данных

В других ситуациях, например, при сжатии данных без потерь, часто бывает возможным заранее определить объём входных данных и явно разделить их на независимые блоки. Описанные здесь методики в равной степени применимы и к этому случаю.

Рекомендации

Может возникнуть соблазн использовать метод «производителей-потребителей», но данный подход не является масштабируемым и может привести к дисбалансу нагрузки. В данной статье поставленные выше задачи решаются с помощью более масштабируемого метода с использованием декомпозиции данных. Представленная здесь техника состоит в создании группы потоков, в которой каждый поток читает блок видео, кодирует его и выводит в буфер реорганизации (reorder buffer). При завершении каждого блока поток читает и обрабатывает следующий блок видео, и так далее. Такое динамическое распределение работы минимизирует дисбаланс нагрузки. Буфер реорганизации отвечает за то, что блоки кодированного видео записываются в правильном порядке независимо от порядка их обработки.

Исходный алгоритм кодирования видео может иметь такую форму:
inFile = OpenFile ()
outFile == InitializeOutputFile ()
WriteHeader (outFile)
outputBuffer = AllocateBuffer (bufferSize)
while (frame = ReadNextFrame (inFile))
{
  EncodeFrame (frame, outputBuffer)
  if (outputBuffer size > bufferThreshold)
    FlushBuffer(outputBuffer, outFile)
}
FlushBuffer (outputBuffer, outFile)

Первой нашей задачей будет замена последовательности чтения и кодирования фреймов блочным алгоритмом, что ставит нас перед проблемой декомпозиции через группы потоков:

WriteHeader (outFile)
while (block = ReadNextBlock (inFile))
{
  while(frame = ReadNextFrame (block))
  {
    EncodeFrame (frame, outputBuffer)
    if (outputBuffer size > bufferThreshold)
      FlushBuffer (outputBuffer, outFile)
  }
  FlushBuffer (outputBuffer, outFile)
}

Определения блока данных в разных приложениях различны, но в случае потока видео естественной границей блока может выступать первый входной фрейм, в котором присутствует смена сцены, при этом могут быть ограничения на минимальный и максимальный размер блока. Блочная обработка требует создания входного буфера и внесения минимальных изменений в исходный код для заполнения этого буфера перед началом обработки. Подобным образом, метод readNextFrame должен быть изменен, чтобы читать данные из буфера, а не из файла.

Следующим шагом является изменение стратегии вывода в выходной буфер, чтобы блоки данных записывались целиком. Этот подход существенно упрощает реорганизацию выходных данных, поскольку теперь необходимо лишь гарантировать, что блоки выводятся в правильном порядке. В следующем фрагменте кода отражены изменения, необходимые для блочного вывода:

 

WriteHeader (outFile)
while (block = ReadNextBlock (inFile))
{
  while (frame = ReadNextFrame (block))
  {
    EncodeFrame (frame, outputBuffer)
  }
  FlushBuffer (outputBuffer, outFile)
}

 

В зависимости от максимального размера блока, может потребоваться выходной буфер большего размера.

Поскольку каждый блок не зависит от остальных, каждый выходной блок обычно начинается со специального заголовка. В случае видеопотока формата MPEG такой заголовок предваряет каждый полный фрейм, называемый I-frame, по отношению к которому определяются все последующие фреймы. В дальнейшем этот заголовок помещается в цикл, работающий с блоками:

while (block = ReadNextBlock (inFile))
{
  WriteHeader (outputBuffer)
  while (frame = ReadNextFrame (block))
  {
    EncodeFrame (frame, outputBuffer)
  }
  FlushBuffer (outputBuffer, outFile)
}

После этих изменений можно использовать параллелизм с помощью потоковой библиотеки ( Pthreads или Win32 threading API) или OpenMP.

// Create a team of threads with private
// copies of outputBuffer, block, and frame
// and shared copies of inFile and outFile
while (AcquireLock,
       block = ReadNextBlock (inFile),
       ReleaseLock, block)
{
  WriteHeader (outputBuffer)
  while (frame = ReadNextFrame (block))
  {
    EncodeFrame (frame, outputBuffer)
  }
  FlushBuffer (outputBuffer, outFile)
}

Эта простая, но эффективная стратегия безопасного чтения данных в нужном порядке. Каждый поток устанавливает блокировку, читает блок данных, затем снимает блокировку. Совместное использование одного входного файла гарантирует, что блоки данных читаются по порядку и только по одному разу. Поскольку поток всегда устанавливает блокировку, блоки данных поступают к потокам на динамическом базисе, то есть в порядке «первый вошел – первый обслужился», что обычно минимизирует возможность дисбаланса нагрузки.

Последней нашей задачей является обеспечение безопасности вывода блоков. Самым простым было бы использовать блокировку и общий файл вывода, чтобы можно было записать только один блок за раз. Такой подход гарантирует потокобезопасность, но может приводить к выводу блоков в порядке, отличном от начального. С другой стороны, потоки могут ожидать, пока все предыдущие блоки не будут записаны, затем писать свои данные. К сожалению, такой подход оказывается неэффективным, так как потоки вынуждены простаивать, ожидая своей очереди на запись.

Более интересным решением является создание циклического реорганизующего выводного буфера. Каждому блоку назначается порядковый номер. «Хвост» буфера указывает блок, который будет записан следующим. Если поток завершает обработку блока данных с номером, отличным от указанного в «хвосте», то он просто вставляет этот блок в нужную позицию в буфере и берет в работу следующий блок. Если поток обнаруживает, что завершенный блок соответствует указанному в хвосте, то он записывает этот блок в файл вместе со всеми последующими блоками, вставленными в буфер до этого. Реорганизационный буфер позволяет сохранять внеочередные завершенные блоки с гарантией того, что они будут записаны в файл в нужном порядке.


Рис. 1. Состояние буфера реорганизации перед записью.

На рис. 1 показано одно из возможных состояний реорганизационного буфера. Блоки от 0 до 35 уже были обработаны и записаны, а блоки 37, 38, 39, 40 и 42 обработаны и сохранены для последующей записи. Когда поток, обрабатывающий блок 36, завершит обработку, то он запишет все блоки с 36 по 40, приведя буфер реорганизации в состояние, показанное на рис. 2. Блок 42 остается в очереди, пока не будет завершен блок 41.


Рис. 2. Состояние буфера реорганизации после записи.

Естественно, нужно соблюдать некоторые меры предосторожности, чтобы гарантировать надежность и быстроту алгоритма:

  • Общие структуры данных должны блокироваться при операциях чтения или записи.
  • Количество свободных блоков в буфере должно быть больше количества потоков.
  • Потоки должны выжидать, если нужное место в буфере занято.
  • Нужно заранее отвести для потоков множественные выходные буферы. Это позволяет вставлять в очередь указатель на буфер и позволяет избежать лишнего копирования данных и выделения памяти.

При использовании выходной очереди окончательный алгоритм выглядит так:

inFile = OpenFile ()
outFile == InitializeOutputFile ()
// Create a team of threads with private 
// copies of outputBuffer, block, and frame, shared
// copies of inFile and outFile.
while (AcquireLock,
       block = ReadNextBlock (inFile),
       ReleaseLock, block)
{
  WriteHeader (outputBuffer)
  while (frame = ReadNextFrame (block))
  {
    EncodeFrame (frame, outputBuffer)
  }
  QueueOrFlush (outputBuffer, outFile)
}

Этот алгоритм поддерживает упорядоченный ввод и вывод, но при этом обладает гибкостью и производительностью внеочередной параллельной обработки.

Указания к применению

В некоторых случаях время на чтение и запись данных сравнимо с временем на их обработку. Тогда можно применять следующие методы:

  • • В Linux* и Windows* есть функции API, которые инициируют операции чтения или записи и затем ожидают их завершения или получают извещение о завершении. Использование этих функций для предварительного чтения входных данных и отложенной записи выходных данных при одновременном выполнении другой работы может эффективно скрыть медлительность операций ввода/вывода. В Windows файлы открываются для асинхронного ввода/вывода с помощью атрибута FILE_FLAG_OVERLAPPED. В Linux асинхронные операции выполняются через несколько функций aio_* из библиотеки libaio.
  • • Если объём входных данных значительный, то метод статической декомпозиции может привести к перегрузке физического диска, поскольку контроллер будет пытаться выполнять несколько одновременных, но не последовательных чтений. Следуя приведённому выше совету и используя общий дескриптор файла и динамичный алгоритм с обслуживанием «первый обратился первый обслужился», можно добиться порядкового последовательного чтения, что увеличит общую пропускную способность системы ввода/вывода.

Важно с осторожностью выбирать размер и количество блоков данных. Обычно большое количество блоков даёт самую гибкую возможность планирования, что уменьшает дисбаланс нагрузки. С другой стороны, очень маленькие блоки могут вызывать лишние издержки на блокировку и даже снижать эффективность алгоритмов компрессии данных.

Для получения подробной информации о возможностях оптимизации компилятора обратитесь к нашему Уведомлению об оптимизации.
Возможность комментирования русскоязычного контента была отключена. Узнать подробнее.