boost :: interprocess очереди сообщений Race Условие при создании?

Я пытаюсь отладить спорадические нарушения доступа, которые происходят внутри очереди сообщений boost :: interprocess. (нарушение доступа при чтении адреса в области общей памяти).

Окружающая среда: boost 1.54, VC ++ 2010. Происходит в обоих Debug & Выпуск сборок.

Это всегда происходит в строке 854 или около нее (в случае приема) в message_queue.hpp:
Комментарии были добавлены мной

      recvd_size     = top_msg.len; // top_msg points to invalid location

Или строка 756 (в случае отправки)

BOOST_ASSERT(free_msg_hdr.priority == 0); // free_msg_hdr points to invalid location

Похоже, что это связано с созданием очереди сообщений. Если очередь сообщений создается «правильно» (т.е. без возможного условия гонки), ошибка никогда не возникает.
В противном случае это может произойти в timed_receive () или timed_send () в очереди в случайное время.

Я пришел с коротким примером, который представляет проблему:
К сожалению, я не могу запустить его на Coliru, так как он требует двух процессов.
Один должен быть запущен без каких-либо параметров, второй — с любым отдельным параметром.
После нескольких запусков один из процессов завершится сбоем в message_queue.

#include <iostream>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <boost/assert.hpp>
#include <boost/date_time.hpp>

using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; // microsec_clock is ambiguous between boost::posix_time and boost::interprocess. What are the odds?

int main(int argc, wchar_t** argv)
{
while(true)
{
int proc = 0;
message_queue* queues[2] = {NULL, NULL};
std::string names[] = {"msgq0", "msgq1"};
if(1 == argc)
{
proc = 0;
message_queue::remove(names[0].c_str());
if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; }
queues[0] = new message_queue(open_or_create, names[0].c_str(), 128, 10240);

bool bRet = false;
do
{
try
{
if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; }
queues[1]=new message_queue(open_only, names[1].c_str());
bRet = true;
}
catch(const interprocess_exception&)
{
//boost::this_thread::sleep(boost::posix_time::milliseconds(2));
delete queues[1];
queues[1] = NULL;
continue;
}
}while(!bRet);

}
else
{
proc = 1;
message_queue::remove(names[1].c_str());
if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; }
queues[1] = new message_queue(open_or_create, names[1].c_str(), 128, 10240);

bool bRet = false;
do
{
try
{
if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; }
queues[0]=new message_queue(open_only, names[0].c_str());
bRet = true;
}
catch(const interprocess_exception&)
{
//boost::this_thread::sleep(boost::posix_time::milliseconds(2));
delete queues[0];
queues[0] = NULL;
continue;
}
}while(!bRet);
}

long long nCnt = 0;
for(int i = 0; i < 1; ++i)
{
if(proc)
{
std::string sOut;
sOut = "Proc1 says: Hello ProcA " + std::to_string(nCnt) + " ";
sOut.resize(10230, ':');
for(int n = 0; n < 3; ++n)
{
queues[1]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
}

bool bMessage = false;
for(int n = 0; n < 3; ++n)
{
size_t nRec; unsigned int nPrio;
std::string sIn; sIn.resize(10240);
bMessage = queues[0]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
if(bMessage)
{
sIn.resize(nRec);
//std::cout << sIn << " ";
}
}
if(bMessage)
{
//std::cout << std::endl;
}
}
else
{
std::string sOut;
sOut = "Proc0 says: Hello Procccccccdadae4325a " + std::to_string(nCnt);
sOut.resize(10240, '.');
for(int n = 0; n < 3; ++n)
{
queues[0]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
}

bool bMessage = false;
for(int n = 0; n < 3; ++n)
{
size_t nRec; unsigned int nPrio;
std::string sIn; sIn.resize(10240);
bMessage = queues[1]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
if(bMessage)
{
sIn.resize(nRec);
//std::cout << sIn << " ";
}
}
if(bMessage)
{
//std::cout << std::endl;
}
}

nCnt++;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}
}
return 0;
}

Я все еще думаю, что могу делать что-то не так, так как я не могу найти что-либо об этой проблеме где-либо еще, и библиотеки boost обычно очень хороши.

Есть ли что-то, что я мог бы сделать неправильно с использованием message_queue в этом примере?

2

Решение

Я не думаю что и то и другое процессы с использованием open_or_create это поддерживаемая идиома Знаете ли вы о эта тема в списке рассылки? Я не могу найти больше обсуждений, поэтому мне кажется, что управление жизненным циклом в конечном итоге не считалось необходимым добавить.

Таким образом, вам нужно будет синхронизировать создание вручную с boost::interprocess или, возможно, с помощью одного из процессов, повторяющих open_only очередь, пока другой процесс не создаст ее.

1

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]