Как сделать многопоточность с «приоритетом»?

У меня есть несколько потоков, обрабатывающих несколько файлов в фоновом режиме, пока программа простаивает.
Чтобы повысить пропускную способность диска, я использую критические разделы, чтобы гарантировать, что никакие два потока никогда не используют одинаковые диск одновременно.

Код (псевдо) выглядит примерно так:

void RunThread(HANDLE fileHandle)
{
// Acquire CRITICAL_SECTION for disk
CritSecLock diskLock(GetDiskLock(fileHandle));

for (...)
{
// Do some processing on file
}
}

Как только пользователь запрашивает файл для обработки, мне нужно остановить все темыКроме тот, который обрабатывает запрошенный файл. Как только файл обработан, я хотел бы возобновить все потоки снова.

Учитывая то что SuspendThread плохая идея, как мне остановить все потоки, кроме той, которая обрабатывает соответствующий ввод?

Какие типы потоков / объектов мне понадобятся — мьютексы, семафоры, события или что-то еще? И как бы я их использовал? (Я надеюсь на совместимость с Windows XP.)

1

Решение

Сложность здесь не является приоритетом как таковой, это факт, что вы хотите, чтобы поток вышел из блокировки, которую он держит, чтобы позволить другому потоку взять его. «Приоритет» относится к тому, какой из набора запускаемых потоков должен быть запланирован для запуска — вы хотите сделать поток работоспособным, который не работает (потому что он ожидает блокировки, удерживаемой другим потоком).

Итак, вы хотите реализовать (как вы выразились):

if (ThisThreadNeedsToSuspend()) { ReleaseDiskLock(); WaitForResume(); ReacquireDiskLock(); }

Так как вы (разумно) используете блокировку с ограничением, я бы хотел инвертировать логику:

while (file_is_not_finished) {
WaitUntilThisThreadCanContinue();
CritSecLock diskLock(blah);
process_part_of_the_file();
}
ReleasePriority();

...

void WaitUntilThisThreadCanContinue() {
MutexLock lock(thread_priority_mutex);
while (thread_with_priority != NOTHREAD and thread_with_priority != thisthread) {
condition_variable_wait(thread_priority_condvar);
}
}

void GiveAThreadThePriority(threadid) {
MutexLock lock(thread_priority_mutex);
thread_with_priority = threadid;
condition_variable_broadcast(thread_priority_condvar);
}

void ReleasePriority() {
MutexLock lock(thread_priority_mutex);
if (thread_with_priority == thisthread) {
thread_with_priority = NOTHREAD;
condition_variable_broadcast(thread_priority_condvar);
}
}

Читайте об условных переменных — они есть во всех последних ОС с похожими базовыми операциями. Они также в Boost и в C ++ 11.

Если вы не можете написать функцию process_part_of_the_file тогда ты не сможешь структурировать это так. Вместо этого вам нужна блокировка с ограничением по объему, которая может освободить и восстановить блокировку диска. Самый простой способ сделать это — сделать его мьютексом, тогда вы можете ждать на condvar, используя тот же мьютекс. Вы все еще можете использовать пару мьютекс / condvar и thread_with_priority объект во многом таким же образом.

Вы выбираете размер «части файла» в зависимости от того, насколько быстро система реагирует на изменение приоритета. Если вам нужно, чтобы это было очень тогда схема действительно не работает — это совместная многозадачность.

Я не совсем доволен этим ответом. Поток с приоритетом может долго зависать, если есть много других потоков, которые уже ожидают такой же блокировки диска. Я бы добавил больше мыслей, чтобы избежать этого. Возможно, не должно быть блокировки на диск, скорее всего все должно обрабатываться с помощью переменной условия и связанного с ним мьютекса. Надеюсь, это поможет вам начать.

1

Другие решения

Я рекомендую вам сделать это совершенно по-другому. Если вам действительно нужен только один поток для каждого диска (я не уверен, что это хорошая идея), тогда вы должны создать один поток на диск и распределять файлы по мере их очереди для обработки.

Чтобы реализовать приоритетные запросы для определенных файлов, я должен был бы поток проверять «слот приоритета» в нескольких точках во время его обычной обработки (и, конечно, в его цикле ожидания основной очереди).

5

Вы можете попросить темы изящно остановиться. Просто проверьте некоторую переменную в цикле внутри потоков и продолжайте или прекращайте работу в зависимости от ее значения.

Несколько мыслей по этому поводу:

  • Установка и проверка этого значения должны быть выполнены в критической секции.
  • Поскольку критическая секция замедляет поток, проверка должна выполняться достаточно часто, чтобы быстро останавливать поток, когда это необходимо, и достаточно редко, чтобы поток не останавливался при получении и освобождении критической секции.
1

После того, как каждый рабочий поток обработает файл, проверьте переменную условия, связанную с этим потоком. Условная переменная может быть реализована просто как bool + критический раздел. Или с функциями InterlockedExchange *. И если честно, я обычно просто использую незащищенный бул между потоками, чтобы сигнализировать «нужно выйти» — иногда с дескриптором события, если рабочий поток мог спать.

После установки условной переменной для каждого потока главный поток ожидает выхода каждого потока через WaitForSingleObject.

DWORD __stdcall WorkerThread(void* pThreadData)
{
ThreadData* pData = (ThreadData*) pTheradData;

while (pData->GetNeedToExit() == false)
{
ProcessNextFile();
}
return 0;
}

void StopWokerThread(HANDLE hThread, ThreadData* pData)
{
pData->SetNeedToExit = true;
WaitForSingleObject(hThread);
CloseHandle(hThread);
}

struct ThreadData()
{
CRITICAL_SECITON _cs;
ThreadData()
{
InitializeCriticalSection(&_cs);
}
~ThreadData()
{
DeleteCriticalSection(&_cs);
}

ThreadData::SetNeedToExit()
{
EnterCriticalSection(&_cs);
_NeedToExit = true;
LeaveCriticalSeciton(&_cs);
}

bool ThreadData::GetNeedToExit()
{
bool returnvalue;
EnterCriticalSection(&_cs);
returnvalue = _NeedToExit = true;
LeaveCriticalSeciton(&_cs);
return returnvalue;
}

};
1

Вы также можете использовать пул потоков и регулировать их работу с помощью Порт завершения ввода / вывода.

Обычно потоки из пула ожидают события / действия порта завершения ввода-вывода.
Когда у вас есть запрос, порт завершения ввода / вывода освобождает поток, и он начинает выполнять работу.

1

Хорошо, как насчет этого:

Два потока на диск для запросов с высоким и низким приоритетом, каждый со своей собственной входной очередью.

Дисковая задача с высоким приоритетом при первоначальной отправке будет выдавать запросы на диск параллельно с любой запущенной задачей с низким приоритетом. Он может сбросить событие ManualResetEvent, которого ожидает поток с низким приоритетом, когда это возможно, (WaitForSingleObject) и, таким образом, будет заблокировано, если поток с высоким приоритетом выполняет дисковые операции. Высокоприоритетный поток должен установить событие после завершения задачи.

Это должно ограничить перегрузку диска интервалом (если таковой имеется) между представлением задачи с высоким приоритетом и тем, когда поток с низким приоритетом может ожидать MRE. Повышение приоритета ЦП потока, обслуживающего высокоприоритетную очередь, может помочь в улучшении производительности высокоприоритетной работы в этом интервале.

Редактировать: под «очередью» я подразумеваю потокобезопасную, блокирующую очередь производителя-потребителя (просто для ясности :).

Дополнительное редактирование — если потокам выдачи требуется уведомление о завершении задания, задачи, выданные в очереди, могут содержать событие «OnCompletion» для вызова с объектом задачи в качестве параметра. Обработчик событий может, например, сигнализировать AutoResetEvent о том, что ожидает исходный поток, обеспечивая синхронное уведомление.

0
По вопросам рекламы ammmcru@yandex.ru
Adblock
detector