Как ждать обработчика asio?

У меня есть объект, который бегает вокруг boost::asio::io_service который имеет некоторые свойства. Что-то вроде того:

class Foo
{
private:

// Not an int in my real code, but it doesn't really matter.
int m_bar;
boost::asio::io_service& m_io_service;
boost::asio::strand m_bar_strand;
};

m_bar должен использоваться только из обработчика, который вызывается через нить m_bar_strand, Это позволяет мне не блокировать эти обработчики.

Чтобы установить m_bar свойство снаружи потока, который выполняется io_service::run() Я написал asynchronous_setter, вот так:

class Foo
{
public:
void async_get_bar(function<void (int)> handler)
{
m_bar_strand.post(bind(&Foo::do_get_bar, this, handler));
}

void async_set_bar(int value, function<void ()> handler)
{
m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
}

private:

void do_get_bar(function<void (int)> handler)
{
// This is only called from within the m_bar_strand, so we are safe.

// Run the handler to notify the caller.
handler(m_bar);
}

void do_set_bar(int value, function<void ()> handler)
{
// This is only called from within the m_bar_strand, so we are safe.
m_bar = value;

// Run the handler to notify the caller.
handler();
}

int m_bar;
boost::asio::io_service& m_io_service;
boost::asio::strand m_bar_strand;
};

Это прекрасно работает, но теперь я хотел бы написать синхронную версию set_bar это устанавливает значение и возвращает только тогда, когда набор был в силе. Он по-прежнему должен гарантировать, что эффективный набор будет происходить в пределах m_bar_strand, В идеале что-то реентерабельное.

Я могу представить решения с семафорами, которые будут изменены из обработчика, но все, что я придумаю, кажется хакерским и действительно не элегантным. Есть ли что-то в Boost / Boost Asio, что позволяет такое?

Как бы вы приступили к реализации этого метода?

5

Решение

Если вам нужно синхронно ждать установки значения, тогда Boost.Thread’s futures может предоставить элегантное решение:

Библиотека фьючерсов предоставляет средства для обработки синхронных будущих значений, независимо от того, генерируются ли эти значения другим потоком или в одном потоке в ответ на внешние раздражители или по требованию.

Короче говоря, boost::promise создается и позволяет установить значение для него. Значение может быть позже получено через связанный boost::future. Вот основной пример:

boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();

// start asynchronous operation that will invoke future.set_value(42)
...

assert(future.get() == 42); // blocks until future has been set.

Два других заметных преимущества этого подхода:

  • future является частью C ++ 11.
  • Исключения могут быть даже переданы future с помощью promise::set_exception(), поддержка элегантного способа предоставления исключений или ошибок вызывающей стороне.

Вот полный пример, основанный на оригинальном коде:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class Foo
{
public:

Foo(boost::asio::io_service& io_service)
: m_io_service(io_service),
m_bar_strand(io_service)
{}

public:

void async_get_bar(boost::function<void(int)> handler)
{
m_bar_strand.post(bind(&Foo::do_get_bar, this, handler));
}

void async_set_bar(int value, boost::function<void()> handler)
{
m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
}

int bar()
{
typedef boost::promise<int> promise_type;
promise_type promise;

// Pass the handler to async operation that will set the promise.
void (promise_type::*setter)(const int&) = &promise_type::set_value;
async_get_bar(boost::bind(setter, &promise, _1));

// Synchronously wait for promise to be fulfilled.
return promise.get_future().get();
}

void bar(int value)
{
typedef boost::promise<void> promise_type;
promise_type promise;

// Pass the handler to async operation that will set the promise.
async_set_bar(value, boost::bind(&promise_type::set_value, &promise));

// Synchronously wait for the future to finish.
promise.get_future().wait();
}

private:

void do_get_bar(boost::function<void(int)> handler)
{
// This is only called from within the m_bar_strand, so we are safe.

// Run the handler to notify the caller.
handler(m_bar);
}

void do_set_bar(int value, boost::function<void()> handler)
{
// This is only called from within the m_bar_strand, so we are safe.
m_bar = value;

// Run the handler to notify the caller.
handler();
}

int m_bar;
boost::asio::io_service& m_io_service;
boost::asio::strand m_bar_strand;
};

int main()
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
boost::thread t(
boost::bind(&boost::asio::io_service::run, boost::ref(io_service)));

Foo foo(io_service);
foo.bar(21);
std::cout << "foo.bar is " << foo.bar() << std::endl;
foo.bar(2 * foo.bar());
std::cout << "foo.bar is " << foo.bar() << std::endl;

io_service.stop();
t.join();
}

который обеспечивает следующий вывод:

foo.bar is 21
foo.bar is 42
5

Другие решения

Вы можете использовать канал, чтобы уведомить синхронный метод, когда значение установлено в async_set_bar(), Предупреждение, приведенный ниже код скомпилирован с мозгом и, скорее всего, содержит ошибки, но он должен понять суть

#include <boost/asio.hpp>

#include <iostream>
#include <thread>

class Foo
{
public:
Foo( boost::asio::io_service& io_service ) :
_bar( 0 ),
_io_service( io_service ),
_strand( _io_service ),
_readPipe( _io_service ),
_writePipe( _io_service )
{
boost::asio::local::connect_pair( _readPipe, _writePipe );
}

void set_async( int v ) {
_strand.post( [=]
{
_bar = v;
std::cout << "sending " << _bar << std::endl;
_writePipe.send( boost::asio::buffer( &_bar, sizeof(_bar) ) );
}
);
}

void set_sync( int v ) {
this->set_async( v );
int value;
_readPipe.receive( boost::asio::buffer(&value, sizeof(value) ) );
std::cout << "set value to " << value << std::endl;
}private:
int _bar;
boost::asio::io_service& _io_service;
boost::asio::io_service::strand _strand;
boost::asio::local::stream_protocol::socket _readPipe;
boost::asio::local::stream_protocol::socket _writePipe;
};

int
main()
{
boost::asio::io_service io_service;
boost::asio::io_service::work w(io_service);
std::thread t( [&]{ io_service.run(); } );
Foo f( io_service );
f.set_sync( 20 );
io_service.stop();
t.join();
}

если вы не можете использовать лямбды c ++ 11, замените их на boost::bind и еще несколько методов обработчика завершения.

0

Вот что я придумал:

class synchronizer_base
{
protected:
synchronizer_base() :
m_has_result(false),
m_lock(m_mutex)
{
}

void wait()
{
while (!m_has_result)
{
m_condition.wait(m_lock);
}
}

void notify_result()
{
m_has_result = true;
m_condition.notify_all();
}

private:

boost::atomic<bool> m_has_result;
boost::mutex m_mutex;
boost::unique_lock<boost::mutex> m_lock;
boost::condition_variable m_condition;
};

template <typename ResultType = void>
class synchronizer : public synchronizer_base
{
public:

void operator()(const ResultType& result)
{
m_result = result;

notify_result();
}

ResultType wait_result()
{
wait();

return m_result;
}

private:

ResultType m_result;
};

template <>
class synchronizer<void> : public synchronizer_base
{
public:

void operator()()
{
notify_result();
}

void wait_result()
{
wait();
}
};

И я могу использовать это так:

class Foo
{
public:
void async_get_bar(function<void (int)> handler)
{
m_bar_strand.post(bind(&Foo::do_get_bar, this, value, handler));
}

void async_set_bar(int value, function<void ()> handler)
{
m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
}

int get_bar()
{
synchronizer<int> sync;

async_get_bar(boost::ref(sync));

return sync.wait_result();
}

void set_bar(int value)
{
synchronizer<void> sync;

async_set_bar(value, boost::ref(sync));

sync.wait_result();
}
};

boost::ref необходимо, потому что случаи synchronizer не копируются. Этого можно избежать, завернув synchronizer в каком-то другом контейнерном классе, но я в порядке с этим решением, как оно есть.

Примечание: сделать НЕ вызывайте такие «синхронизированные» функции из обработчика, иначе он может просто зайти в тупик!

0
По вопросам рекламы [email protected]