Неочевидная проблема при жизни с std :: обещанием и std :: будущим

Этот вопрос очень похож на предыдущий здесь: состояние гонки в pthread_once ()?

По сути, это та же проблема — время жизни std::promise окончание во время звонка promise::set_value (то есть: после того, как соответствующее будущее было помечено, но до pthread_once выполнил)

Итак, я знаю, что мое использование имеет эту проблему, и поэтому я не могу использовать его таким образом. Тем не менее, я думаю, что это не очевидно. (По мудрым словам Скотта Мейера: Сделать интерфейсы простыми в использовании, правильными и трудными в использовании неправильно)

Я представляю пример ниже:

  • У меня есть тема (dispatcher) который вращается в очереди, выдвигая «работу» ( std::function) и выполняет его.
  • У меня есть служебный класс под названием synchronous_job который блокирует вызывающий поток до тех пор, пока в потоке диспетчера не будет выполнено «задание»
  • std::promise а также std::future являются членами synchronous_job — однажды future установлено, заблокированный вызывающий поток продолжается, что приводит к synchronous_job выскочив из стека и быть уничтоженным.
  • К сожалению, в настоящее время dispatcher был переключен контекст в то время как внутри promise::set_value; future помечен, но вызов pthread_once не выполнено, и стек pthread каким-то образом поврежден, что означает в следующий раз: тупик

Я бы ожидал звонка promise::set_value быть атомным; тот факт, что он должен сделать больше работы после того, как он пометил future неизбежно приведет к такого рода проблемам при использовании этих классов таким образом.

Поэтому мой вопрос: как добиться такой синхронизации с помощью std::promise а также std::future, сохраняя их время жизни, связанное с классом, который обеспечивает этот механизм синхронизации?

@Jonathan Wakely, не могли бы вы использовать какой-нибудь класс в стиле RAII, который устанавливает condition_variable в своем деструкторе после того, как он помечает future? Это будет означать, что даже если promise разрушен в разгар вызова к set_valueдополнительная работа по установке переменной условия будет выполнена правильно. Просто идея, не уверен, что вы можете использовать это …

Полный рабочий пример ниже и трассировка стека заблокированного приложения после:

#include <iostream>
#include <thread>
#include <future>
#include <queue>

struct dispatcher
{
dispatcher()
{
_thread = std::move(std::thread(&dispatcher::loop, this));
}
void post(std::function<void()> job)
{
std::unique_lock<std::mutex> l(_mtx);
_jobs.push(job);
_cnd.notify_one();
}
private:
void loop()
{
for (;;)
{
std::function<void()> job;
{
std::unique_lock<std::mutex> l(_mtx);
while (_jobs.empty())
_cnd.wait(l);
job.swap(_jobs.front());
_jobs.pop();
}
job();
}
}
std::thread                       _thread;
std::mutex                        _mtx;
std::condition_variable           _cnd;
std::queue<std::function<void()>> _jobs;
};
//-------------------------------------------------------------

struct synchronous_job
{
synchronous_job(std::function<void()> job, dispatcher& d)
: _job(job)
, _d(d)
, _f(_p.get_future())
{
}
void run()
{
_d.post(std::bind(&synchronous_job::cb, this));
_f.wait();
}
private:
void cb()
{
_job();
_p.set_value();
}
std::function<void()> _job;
dispatcher&           _d;
std::promise<void>    _p;
std::future<void>     _f;
};
//-------------------------------------------------------------

struct test
{
test()
: _count(0)
{
}
void run()
{
synchronous_job job(std::bind(&test::cb, this), _d);
job.run();
}
private:
void cb()
{
std::cout << ++_count << std::endl;
}
int _count;
dispatcher _d;
};
//-------------------------------------------------------------

int main()
{
test t;
for (;;)
{
t.run();
}
}

Трассировка стека заблокированного приложения:

Нить 1 (основная нить)

#0  0x00007fa112ed750c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007fa112a308ec in __gthread_cond_wait (__mutex=<optimized out>, __cond=<optimized out>) at /hostname/tmp/syddev/Build/gcc-4.6.2/gcc-build/x86_64-unknown-linux-gnu/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:846
#2  std::condition_variable::wait (this=<optimized out>, __lock=...) at ../../../../libstdc++-v3/src/condition_variable.cc:56
#3  0x00000000004291d9 in std::condition_variable::wait<std::__future_base::_State_base::wait()::{lambda()#1}>(std::unique_lock<std::mutex>&, std::__future_base::_State_base::wait()::{lambda()#1}) (this=0x78e050, __lock=..., __p=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/condition_variable:93
#4  0x00000000004281a8 in std::__future_base::_State_base::wait (this=0x78e018) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:331
#5  0x000000000042a2d6 in std::__basic_future<void>::wait (this=0x7fff0ae515c0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:576
#6  0x0000000000428dd8 in synchronous_job::run (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:60
#7  0x0000000000428f97 in test::run (this=0x7fff0ae51660) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:83
#8  0x0000000000427ad6 in main () at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:99

Резьба 2 (диспетчерская резьба)

#0  0x00007fa112ed8b5b in pthread_once () from /lib64/libpthread.so.0
#1  0x0000000000427946 in __gthread_once (__once=0x78e084, __func=0x4272d0 <__once_proxy@plt>) at /hostname/sdk/gcc470/suse11/x86_64/bin/../lib/gcc/x86_64-unknown-linux-gnu/4.7.0/../../../../include/c++/4.7.0/x86_64-unknown-linux-gnu/bits/gthr-default.h:718
#2  0x000000000042948b in std::call_once<void (std::__future_base::_State_base::*)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >, std::reference_wrapper<bool> >(std::once_flag&, void (std::__future_base::_State_base::*&&)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const&&, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >&&, std::reference_wrapper<bool>&&) (__once=..., __f=
@0x7fa111ff6be0: (void (std::__future_base::_State_base::*)(std::__future_base::_State_base * const, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> &, bool &)) 0x42848a <std::__future_base::_State_base::_M_do_set(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&)>) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/mutex:819
#3  0x000000000042827d in std::__future_base::_State_base::_M_set_result(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>, bool) (this=0x78e018, __res=..., __ignore_failure=false) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:362
#4  0x00000000004288d5 in std::promise<void>::set_value (this=0x7fff0ae515a8) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:1206
#5  0x0000000000428e2a in synchronous_job::cb (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:66
#6  0x000000000042df53 in std::_Mem_fn<void (synchronous_job::*)()>::operator() (this=0x78c6e0, __object=0x7fff0ae51580) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:554
#7  0x000000000042d77c in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::__call<void, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) (this=0x78c6e0, __args=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1156
#8  0x000000000042cb28 in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::operator()<, void>() (this=0x78c6e0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1215
#9  0x000000000042b772 in std::_Function_handler<void (), std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)> >::_M_invoke(std::_Any_data const&) (__functor=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1926
#10 0x0000000000429f2c in std::function<void ()>::operator()() const (this=0x7fa111ff6da0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:2311
#11 0x0000000000428c3c in dispatcher::loop (this=0x7fff0ae51668) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:39

10

Решение

std::promise как любой другой объект: вы можете получить к нему доступ только из одного потока за раз. В этом случае вы звоните set_value() и уничтожение объекта из отдельных потоков без достаточной синхронизации: нигде в спецификации не сказано, что set_value не будет касаться promise объект после создания future готовы.

Однако, поскольку это будущее используется для одноразовой синхронизации, вам все равно не нужно делать это: создайте пару обещание / будущее прямо в run()и передать обещание в ветку:

struct synchronous_job
{
synchronous_job(std::function<void()> job, dispatcher& d)
: _job(job)
, _d(d)
{
}
void run(){
std::promise<void> p;
std::future<void> f=p.get_future();

_d.post(
[&]{
cb(std::move(p));
});

f.wait();
}
private:
void cb(std::promise<void> p)
{
_job();
p.set_value();
}
std::function<void()> _job;
dispatcher&           _d;
};
16

Другие решения

В прямом ответе на ваш вопрос правильным ответом будет дать std::promise в тему. Таким образом, он гарантированно существует, пока поток этого хочет.

Под капотом std::future а также std::promise иметь общее состояние, на которое указывают оба, и гарантированно останется доступным, пока обе стороны не будут уничтожены. Концептуально, это похоже на обещание и будущее, когда отдельные копии shared_ptr относятся к одному и тому же объекту. Этот объект содержит необходимые базовые механизмы для передачи состояний, блоков и других операций.

Что касается попытки подать сигнал об уничтожении, проблема в том, где будет существовать эта переменная условия? Общая область разрушается, как только все связанные с ней фьючерсы и обещания разрушаются. Тупик возникает из-за того, что область разрушается, пока она еще используется (поскольку компилятор не знает, что другой поток все еще получает доступ к обещанию, поскольку он уничтожается). Добавление дополнительных переменных условия в любое общее состояние не поможет, так как они также будут уничтожены.

2

Отвечая на мой собственный вопрос, предложить работоспособное решение. Не использует std::promise или же std::future, но он достигает синхронизации, которую я ищу.

Обновить synchronous_job использовать std::condition_variable а также std::mutex вместо:

Редактировать: Обновлен, чтобы включить логический флаг в соответствии с предложением Дейв С

struct synchronous_job
{
synchronous_job(std::function<void()> job, dispatcher& d)
: _job(job)
, _d(d)
, _done(false)
{
}
void run()
{
_d.post(std::bind(&synchronous_job::cb, this));
std::unique_lock<std::mutex> l(_mtx);
if (!_done)
_cnd.wait(l);
}
private:
void cb()
{
_job();
std::unique_lock<std::mutex> l(_mtx);
_done = true;
_cnd.notify_all();
}
std::function<void()>   _job;
dispatcher&             _d;
std::condition_variable _cnd;
std::mutex              _mtx;
bool                    _done;
};
1

Канонический ответ — никогда не связывать std :: bind to этот а точнее в std :: weak_ptr. Когда вы получите обратный вызов, заблокируйте его и проверьте NULL, прежде чем вызывать обратный вызов.

Или, повторюсь, никогда не вызывайте функцию-член (извне) из области, которая не содержит shared_ptr для объекта.

1
По вопросам рекламы [email protected]