В настоящее время я тестирую функцию потокового графа в TBB.
Чтобы использовать его, я должен быть в состоянии прервать выполнение некоторого узла в графе, включая все дочерние элементы, которые зависят от него, но оставляют выполнение других дочерних элементов, которые не зависят от него.
Создание исключения из тела или вызов метода task :: cancel_group_execution () отменяет выполнение всех узлов.
#include <cstdio>
#include "tbb/flow_graph.h"
using namespace tbb::flow;
struct body
{ std::string my_name;
body( const char *name ) : my_name(name)
{
}
void operator()( continue_msg ) const
{ if (my_name == "B")
tbb::task::self().group()->cancel_group_execution();
else
{ sleep(1);
printf("%s\n", my_name.c_str());
}
}
};
int main()
{
graph g;
broadcast_node< continue_msg > start(g);
continue_node<continue_msg> a( g, body("A"));
continue_node<continue_msg> b( g, body("B"));
continue_node<continue_msg> c( g, body("C"));
continue_node<continue_msg> d( g, body("D"));
continue_node<continue_msg> e( g, body("E"));
make_edge( start, a );
make_edge( start, b );
make_edge( a, c );
make_edge( b, c );
make_edge( c, d );
make_edge( a, e );
for (int i = 0; i < 3; ++i )
try
{ start.try_put( continue_msg() );
g.wait_for_all();
} catch (...)
{ printf("Caught exception\n");
}
return 0;
}
Вы можете представить статус отмены с bool
вместо continue_msg
,
каждый process_node
получать статус узла-предшественника и задачу процесса, когда она доступна, и отправлять обновленный статус прерывания на узел-преемник.
struct body
{ std::string my_name;
body( const char *name ) : my_name(name)
{
}
bool operator()( bool avail ) const
{ if (!avail)
printf("%s skipped\n", my_name.c_str());
else
if (my_name == "B")
{ printf("%s fail\n", my_name.c_str());
avail = false; // fail task
}
else
{ sleep(1);
printf("%s\n", my_name.c_str());
}
return avail;
}
};
int main()
{
graph g;
typedef function_node<bool, bool> process_node;
typedef std::tuple<bool,bool> bool_pair;
broadcast_node< bool > start(g);
process_node a( g, unlimited, body("A"));
process_node b( g, unlimited, body("B"));
process_node c( g, unlimited, body("C"));
join_node<bool_pair> join_c(g);
function_node<bool_pair, bool> and_c(g, unlimited, [](const bool_pair& in)->bool {
return std::get<0>(in) && std::get<1>(in);
});
process_node d( g, unlimited, body("D"));
process_node e( g, unlimited, body("E"));
/*
* start -+-> A -+-> E
* | \
* | \
* | join_c -> and_c -> C -> D
* | /
* | /
* +-> B --
*/
make_edge( start, a );
make_edge( start, b );
make_edge( a, input_port<0>(join_c) );
make_edge( b, input_port<1>(join_c) );
make_edge( join_c, and_c );
make_edge( and_c, c );
make_edge( c, d );
make_edge( a, e );
for (int i = 0; i < 3; ++i )
try
{ start.try_put( true );
g.wait_for_all();
} catch (...)
{ printf("Caught exception\n");
}
return 0;
}
Если вы хотите отменить часть выполнения графа, вам нужно использовать task_group_contexts. Добавьте следующее:
#include "tbb/task.h"
и измените основную программу на следующую:
int main()
{
tbb::task_group_context tgc1;
tbb::task_group_context tgc2;
graph g1(tgc1);
graph g2(tgc2);
printf("Constructing graph\n");
broadcast_node< continue_msg > start(g1);
continue_node<continue_msg> a( g1, body("A"));
continue_node<continue_msg> b( g2, body("B"));
continue_node<continue_msg> c( g2, body("C"));
continue_node<continue_msg> d( g2, body("D"));
continue_node<continue_msg> e( g1, body("E"));
make_edge( start, a );
make_edge( start, b );
make_edge( a, c );
make_edge( b, c );
make_edge( c, d );
make_edge( a, e );
for (int i = 0; i < 3; ++i ) {
try
{
printf("broadcasting graph %d\n", i);
start.try_put( continue_msg() );
g1.wait_for_all();
g2.wait_for_all();
} catch (...)
{ printf("Caught exception\n");
}
g1.wait_for_all();
g1.reset();
g2.reset();
}
return 0;
}
Каждый task_group_context является подконтекстом родительского контекста (по умолчанию). Отмена g2 не влияет на g1. Если B выбрасывает вместо отмены, ваш улов гарантирует, что исключение не будет передано родителю. Если вы не поймаете исключение, родительский контекст также будет отменен, равно как и контекст для A и E.
Обратите внимание, что вам нужно дождаться завершения обоих графиков. Также вы должны reset()
графики для сброса continue_nodes
счетчики. На самом деле, в случае, когда исключение выдается и перехватывается, нет никакой гарантии, что g1 завершится после catch(...)
завершено, так что вам нужно сделать g1.wait_for_all()
вне try/catch
, Я отредактировал код, чтобы показать это.
Вместо того, чтобы использовать отмену, чтобы остановить часть вычисления, вы могли бы сделать B a multifunction_node
с вводом continue_msg
и один выход continue_msg
:
typedef multifunction_node<continue_msg, tuple<continue_msg> > mf_type;
struct mf_body {
std::string my_name;
mf_body(const char *name) : my_name(name) {}
void operator()(continue_msg, mf_type::output_ports_type &op) {
if(my_name == "B") {
printf("%s returning without sending\n", my_name.c_str());
return;
}
sleep(1);
get<0>(op).try_put(continue_msg());
return;
}
};
Затем вы создаете узел B:
mf_type b( g, unlimited, mf_body("B"));
и ребро от B до C будет установлено так:
make_edge( output_port<0>(b), c );
В этом случае вам не нужно разбивать график на два подграфа. Если бы узел B был отменен, он возвращается без пересылки continue_msg
его преемнику. Если узел B не пересылает сообщение, узел C не будет выполняться, потому что ему нужно два continue_msgs
начать. Вам все еще нужно сбросить график после, чтобы сбросить счетчик C.
multifunction_node
имеет преимущество в том, что вы можете переслать сообщение или нет. Предостережение здесь в том, что multifunction_node
с continue_msg
вход не похож на continue_node
, continue_node
нужно как можно больше continue_msgs
так как у него есть предшественники (плюс значение инициализации при строительстве.) multifunction_node
Тело выполняется, когда он получает continue_msg
независимо от того, сколько у него предшественников. Таким образом, для вашего графика вы не можете просто сделать все узлы multifunction_nodes
,