Я работаю над многопоточным проектом на C ++, который отправляет данные в серию сетевых подключений. Вот некоторый псевдокод, который иллюстрирует, что происходит:
class NetworkManager
{
Thread writer; // responsible for writing data in queues to the network
Queue[] outqueue; // holds data until the network is ready to receive it
Network[] nets; // sockets or whatever
Mutex[] outlock; // protects access to members of outqueue
Mutex managerlock; // protects access to all queues
Condition notifier; // blocks the write thread when there is no data
}
На самом деле все намного сложнее, но я исключил множество ненужных деталей. Одна важная деталь заключается в том, что сеть ограничена по скорости, и способность программы ставить данные в очередь независимо от ее отправки является особенностью проекта (программе не нужно ждать, чтобы обработать новые данные, потому что она блокирует запись по сети. ).
Вот краткое описание того, как программа должна взаимодействовать с этим классом. Обратите внимание, что QueueWriteToNetwork
а также DoAdministrativeStuff
в моей реализации управляется ОДНИМ ЖЕ внешним потоком.
QueueWriteToNetwork(network, data) // responsibility of external thread
Let i = the index of the network to send to
Lock(outlock[i])
outqueue[i].Add(data)
Unlock(outlock[i])
Signal(notifier)
DoAdministrativeStuff(network, more) // responsibility of external thread
Lock(managerlock)
more.Process() // might do any of the following:
// connect or disconnect networks
// add or remove networks from list
// immediate write data to network, bypassing rate limiting
// other things that I forgot
Unlock(managerlock)
WriterThreadMain() // responsibility of internal write thread
Lock(managerlock)
Loop forever:
Check for data in every queue (locking and unlocking each queue)
If all queues have no data to write:
Wait(notifier, managerlock)
continue
If outqueue[i] has data ready to write
Lock(outlock[i])
Send data from outqueue[i]
outqueue[i].Pop()
Unlock(outqueue[i])
Как вы могли бы видеть, есть несколько проблем с этим подходом (например, если запись ставится в очередь в сети с QueueWriteToNetwork
как WriterThreadMain
проверяет, являются ли очереди пустыми, вызов Signal(notifier)
потенциально может быть отброшен, и очередь записи может оставаться в ожидании, даже если данные были готовы).
Мне нужно сформулировать это так, чтобы было возможно следующее:
DoAdministrativeStuff
функция должна иметь возможность гарантировать, что поток записи заблокирован в безопасном состоянии (то есть не имеет доступа к какой-либо очереди, блокировке очереди или сети)Я исследовал возможность использования семафора для отслеживания количества элементов в очередях записи. Это решило бы проблему потерянного обновления, о которой я упоминал ранее.
Наконец, я ориентируюсь на Linux (используя библиотеки Posix для предоставления типов pthread_t
, pthread_mutex_t
, pthread_cond_t
, а также sem_t
), и меня не волнует совместимость с Windows. Также, пожалуйста, не рекомендуйте Boost. Вставка любого заголовка Boost в мой код делает сборку невыносимо долгой.
Задача ещё не решена.
Других решений пока нет …