Завершение работы многопоточного приложения путем установки обработчика сигнала

В следующем коде я создаю игрушечный класс, у которого есть поток, который пишет в очередь, в то время как другой поток читает из этой очереди и печатает его в stdout, Теперь, чтобы полностью завершить работу системы, я установил обработчик для SIGINT, Я ожидаю, что обработчик сигнала настроит std::atomic<bool> переменная stopFlag, что приведет threadB протолкнуть ядовитую таблетку (страж) в очередь, threadA остановится

class TestClass
{
public:

TestClass();
~TestClass();
void shutDown();

TestClass(const TestClass&) = delete;
TestClass& operator=(const TestClass&) = delete;private:
void init();
void postResults();
std::string getResult();
void processResults();

std::atomic<bool> stopFlag;

std::mutex outQueueMutex;
std::condition_variable outQueueConditionVariable;
std::queue<std::string> outQueue;

std::unique_ptr<std::thread> threadA;
std::unique_ptr<std::thread> threadB;
};

void TestClass::init()
{
threadA = std::make_unique<std::thread>(&TestClass::processResults, std::ref(*this));
threadB = std::make_unique<std::thread>(&TestClass::postResults, std::ref(*this));
}

TestClass::TestClass():
stopFlag(false)
{
init();
}

TestClass::~TestClass()
{
threadB->join();
}

void TestClass::postResults()
{
while(true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::string name = "ABCDEF";
{
std::unique_lock<std::mutex> lock(outQueueMutex);
outQueue.push(name);
outQueueConditionVariable.notify_one();
}
if(stopFlag)
{
/*For shutting down output thread*/
auto poisonPill = std::string();
{
std::unique_lock<std::mutex> lock(outQueueMutex);
outQueue.push(poisonPill);
outQueueConditionVariable.notify_one();
}
threadA->join();
break;
}
}
}

void TestClass::shutDown()
{
stopFlag = true;
}

std::string TestClass::getResult()
{
std::string result;
{
std::unique_lock<std::mutex> lock(outQueueMutex);
while(outQueue.empty())
{
outQueueConditionVariable.wait(lock);
}
result= outQueue.front();
outQueue.pop();
}
return result;
}

void TestClass::processResults()
{
while(true)
{
const auto result = getResult();

if(result.empty())
{
break;
}

std::cout << result << std::endl;

}
}

static void sigIntHandler(std::shared_ptr<TestClass> t, int)
{
t->shutDown();
}
static std::function<void(int)> handler;

int main()
{
auto testClass = std::make_shared<TestClass>();
handler = std::bind(sigIntHandler, testClass, std::placeholders::_1);
std::signal(SIGINT, [](int n){ handler(n);});
return 0;
}

Я скомпилировал это, используя gcc 5.2, используя флаг -std = c ++ 14. При нажатии Ctrl-C на моем компьютере с CentOS 7 я получаю следующую ошибку:

terminate called after throwing an instance of 'std::system_error'
what():  Invalid argument
Aborted (core dumped)

Пожалуйста, помогите мне понять, что происходит.

-1

Решение

Что происходит, что ваш main Функция выхода немедленно уничтожает глобальный handler объект, а затем testClass, Тогда основной поток блокируется в TestClass::~TestClass, Обработчик сигнала завершает доступ к уже уничтоженным объектам, что приводит к неопределенному поведению.

Основной причиной является неопределенное владение объектом из-за общих указателей — вы не знаете, что и когда в конечном итоге уничтожает ваши объекты.


Более общий подход заключается в использовании другого потока для обработки всех сигналов и блокирования сигналов во всех других потоках. Этот поток обработки сигналов может затем вызывать любые функции после получения сигнала.

Вам также вообще не нужны умные указатели и функциональные оболочки.

Пример:

class TestClass
{
public:
TestClass();
~TestClass();
void shutDown();

TestClass(const TestClass&) = delete;
TestClass& operator=(const TestClass&) = delete;

private:
void postResults();
std::string getResult();
void processResults();std::mutex outQueueMutex;
std::condition_variable outQueueConditionVariable;
std::queue<std::string> outQueue;
bool stop = false;

std::thread threadA;
std::thread threadB;
};

TestClass::TestClass()
: threadA(std::thread(&TestClass::processResults, this))
, threadB(std::thread(&TestClass::postResults, this))
{}

TestClass::~TestClass() {
threadA.join();
threadB.join();
}

void TestClass::postResults() {
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::string name = "ABCDEF";
{
std::unique_lock<std::mutex> lock(outQueueMutex);
if(stop)
return;
outQueue.push(name);
outQueueConditionVariable.notify_one();
}
}
}

void TestClass::shutDown() {
std::unique_lock<std::mutex> lock(outQueueMutex);
stop = true;
outQueueConditionVariable.notify_one();
}

std::string TestClass::getResult() {
std::string result;
{
std::unique_lock<std::mutex> lock(outQueueMutex);
while(!stop && outQueue.empty())
outQueueConditionVariable.wait(lock);
if(stop)
return result;
result= outQueue.front();
outQueue.pop();
}
return result;
}

void TestClass::processResults()
{
while(true) {
const auto result = getResult();
if(result.empty())
break;
std::cout << result << std::endl;
}
}

int main() {
// Block signals in all threads.
sigset_t sigset;
sigfillset(&sigset);
::pthread_sigmask(SIG_BLOCK, &sigset, nullptr);

TestClass testClass;

std::thread signal_thread([&testClass]() {
// Unblock signals in this thread only.
sigset_t sigset;
sigfillset(&sigset);
int signo = ::sigwaitinfo(&sigset, nullptr);
if(-1 == signo)
std::abort();

std::cout << "Received signal " << signo << '\n';
testClass.shutDown();
});

signal_thread.join();
}
1

Другие решения

На вашей платформе этот обработчик сигнала вызывается, когда SIGINT сигнал приходит. Список функций, которые могут быть вызваны внутри этого обработчика сигнала довольно ограничен, и вызов чего-либо еще приводит к неопределенному поведению.

1

По вопросам рекламы [email protected]