Как совместить стренговую обертку и приоритетную обертку на Boost Asio

Я хотел бы одновременно использовать прядь Boost.Asio и приоритетную оболочку.

Прежде чем написать свой код, я прочитал следующую информацию:

Повысьте приоритет ASIO и прядь

boost :: asio и активный объект

http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531

Зачем мне нужно прядь на соединение при использовании boost :: asio?

Я хотел бы использовать подход обертки, потому что я хотел бы использовать различные асинхронные API, такие как async_read, async_write и async_connect.
Согласно http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531 , кажется, что приоритетную оболочку и упаковочную ленту можно комбинировать.

Поэтому я написал код на основе следующего примера:

http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp

Вот мой код:

#include <iostream>
#include <functional>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>

#include <boost/asio.hpp>
#include <boost/optional.hpp>

#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1

class handler_priority_queue {
public:
template <typename Handler>
void add(int priority, Handler&& handler) {
std::cout << "add(" << priority << ")" << std::endl;
std::lock_guard<std::mutex> g(mtx_);
handlers_.emplace(priority, std::forward<Handler>(handler));
}

void execute_all() {
auto top = [&]() -> boost::optional<queued_handler> {
std::lock_guard<std::mutex> g(mtx_);
if (handlers_.empty()) return boost::none;
boost::optional<queued_handler> opt = handlers_.top();
handlers_.pop();
return opt;
};
while (auto h_opt = top()) {
h_opt.get().execute();
}
}

template <typename Handler>
class wrapped_handler {
public:
wrapped_handler(handler_priority_queue& q, int p, Handler h)
: queue_(q), priority_(p), handler_(std::move(h))
{
}

template <typename... Args>
void operator()(Args&&... args) {
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}

//private:
handler_priority_queue& queue_;
int priority_;
Handler handler_;
};

template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler));
}

private:
class queued_handler {
public:
template <typename Handler>
queued_handler(int p, Handler&& handler)
: priority_(p), function_(std::forward<Handler>(handler))
{
std::cout << "queued_handler()" << std::endl;
}

void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_();
}

friend bool operator<(
queued_handler const& lhs,
queued_handler const & rhs) {
return lhs.priority_ < rhs.priority_;
}

private:
int priority_;
std::function<void()> function_;
};

std::priority_queue<queued_handler> handlers_;
std::mutex mtx_;
};

// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, std::forward<Function>(f));
}

//----------------------------------------------------------------------

int main() {
int const num_of_threads = 4;
int const num_of_tasks = 5;

boost::asio::io_service ios;
boost::asio::strand strand(ios);handler_priority_queue pq;

for (int i = 0; i != num_of_tasks; ++i) {
ios.post(
#if ENABLE_STRAND
strand.wrap(
#endif
#if ENABLE_PRIORITY
pq.wrap(
i,
#endif
[=] {
std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl;
}
#if ENABLE_PRIORITY
)
#endif
#if ENABLE_STRAND
)
#endif
);
}

std::vector<std::thread> pool;
for (int i = 0; i != num_of_threads; ++i) {
pool.emplace_back([&]{
std::cout << "before run_one()" << std::endl;
while (ios.run_one()) {
std::cout << "before poll_one()" << std::endl;
while (ios.poll_one())
;
std::cout << "before execute_all()" << std::endl;
pq.execute_all();
}
}
);
}
for (auto& t : pool) t.join();
}

Обертки включаются следующими макросами:

#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1

Когда оба макроса включены, я получил следующий результат:

before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
execute(3)
execute(2)
execute(1)
execute(0)
before run_one()
before run_one()
before run_one()

Я ожидаю, что я получил

[called] priority,thread_id

вывод как

[called] 1,140512649541376

но я не понял

Кажется, что в функции execute(), function_() называется но wrapped_handler::operator() не называется. (Функция execute() вызывается из pq.execute_all(); в моем коде.)

void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_(); // It is called.
}

template <typename Handler>
class wrapped_handler {
public:

template <typename... Args>
void operator()(Args&&... args) { // It is NOT called
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}

Я проследил последовательность после function_() называется.

Следующие функции называются:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191
https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L76
https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158
https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55
https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L94

Тогда в функции bool strand_service::do_dispatch(implementation_type& impl, operation* op), операция op не вызывается, но помещается в очередь в следующей строке:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L111

Я не уверен, почему function_() отправляется в strand_service. Я думаю, что оболочка прядей уже была развернута в следующем месте моего кода:

template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, std::forward<Function>(f));
}

Если я включил только приоритетную оболочку, я получил следующий результат.
Кажется, что работает, как я ожидал.

before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
operator()
[called] 4,140512649541376
execute(3)
operator()
[called] 3,140512649541376
execute(2)
operator()
[called] 2,140512649541376
execute(1)
operator()
[called] 1,140512649541376
execute(0)
operator()
[called] 0,140512649541376
before run_one()
before run_one()
before run_one()

Если бы я включил только упаковщик прядей, я получил следующий результат.
Кажется, что работа, как я ожидал тоже.

before run_one()
[called] 0,140127385941760
before poll_one()
[called] 1,140127385941760
[called] 2,140127385941760
[called] 3,140127385941760
[called] 4,140127385941760
before execute_all()
before run_one()
before run_one()
before run_one()

Есть идеи?

2

Решение

Я решил проблему.

Я не уверен, почему function_ () отправляется на strand_service. Я думаю, что оболочка прядей уже была развернута в следующем месте моего кода:

template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, std::forward<Function>(f));
}

Параметр f это оригинальный обработчик. Это означает, что обработчик очереди с приоритетом обернут и прячет. Обмотка пряди снаружи. Так что при вызове f, отправляется на strand_service. Этот процесс происходит в том же сервисе strand_service, поэтому обработчик не вызывается.

Чтобы решить эту проблему, добавьте h->handler_ в приоритетную очередь вместо f следующее:

// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, h->handler_);
}

handler_ является переменной-членом шаблона класса wrapped_handler, Он содержит обработчик, который не упакован.

Вот полный код:

#include <iostream>
#include <functional>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>

#include <boost/asio.hpp>
#include <boost/optional.hpp>

#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1

class handler_priority_queue {
public:
template <typename Handler>
void add(int priority, Handler&& handler) {
std::cout << "add(" << priority << ")" << std::endl;
std::lock_guard<std::mutex> g(mtx_);
handlers_.emplace(priority, std::forward<Handler>(handler));
}

void execute_all() {
auto top = [&]() -> boost::optional<queued_handler> {
std::lock_guard<std::mutex> g(mtx_);
if (handlers_.empty()) return boost::none;
boost::optional<queued_handler> opt = handlers_.top();
handlers_.pop();
return opt;
};
while (auto h_opt = top()) {
h_opt.get().execute();
}
}

template <typename Handler>
class wrapped_handler {
public:
template <typename HandlerArg>
wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h)
: queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h))
{
}

template <typename... Args>
void operator()(Args&&... args) {
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}

//private:
handler_priority_queue& queue_;
int priority_;
Handler handler_;
};

template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler));
}

private:
class queued_handler {
public:
template <typename Handler>
queued_handler(int p, Handler&& handler)
: priority_(p), function_(std::forward<Handler>(handler))
{
std::cout << "queued_handler()" << std::endl;
}

void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_();
}

friend bool operator<(
queued_handler const& lhs,
queued_handler const & rhs) {
return lhs.priority_ < rhs.priority_;
}

private:
int priority_;
std::function<void()> function_;
};

std::priority_queue<queued_handler> handlers_;
std::mutex mtx_;
};

// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, h->handler_);
}

//----------------------------------------------------------------------

int main() {
int const num_of_threads = 4;
int const num_of_tasks = 5;

boost::asio::io_service ios;
boost::asio::strand strand(ios);handler_priority_queue pq;

for (int i = 0; i != num_of_tasks; ++i) {
ios.post(
#if ENABLE_STRAND
strand.wrap(
#endif
#if ENABLE_PRIORITY
pq.wrap(
i,
#endif
[=] {
std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl;
}
#if ENABLE_STRAND
)
#endif
#if ENABLE_PRIORITY
)
#endif
);
}

std::vector<std::thread> pool;
for (int i = 0; i != num_of_threads; ++i) {
pool.emplace_back([&]{
std::cout << "before run_one()" << std::endl;
while (ios.run_one()) {
std::cout << "before poll_one()" << std::endl;
while (ios.poll_one())
;
std::cout << "before execute_all()" << std::endl;
pq.execute_all();
}
}
);
}
for (auto& t : pool) t.join();
}

И вот вывод:

before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
[called] 4,139903315736320
execute(3)
[called] 3,139903315736320
execute(2)
[called] 2,139903315736320
execute(1)
[called] 1,139903315736320
execute(0)
[called] 0,139903315736320
before run_one()
before run_one()
before run_one()
2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]