Как правильно подождать, пока все наблюдатели on_completed будут вызваны, если наблюдатели будут использовать наблюдаемую (rxcpp :: наблюдаемую_показу_наследования ()):
Например:
{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
// ...
s.on_completed();
};
auto values = rxcpp::observable<>::create<int>(generator).publish();
auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
.subscribe([&](int) { slow_function(foo); }));
auto lifetime = rxcpp::composite_subscription();
lifetime.add([&](){ wrapper.log("unsubscribe"); });
auto s2 = values.ref_count().as_blocking().subscribe(lifetime);
// hope to call something here to wait for the completion of
// s1's on_completed function
}
// the program usually crashes here when foo goes out of scope because
// the slow_function(foo) is still working on foo. I also noticed that
// s1's on_completed never got called.
Мой вопрос состоит в том, как ждать, пока s1 on_completed не будет завершен, без необходимости устанавливать и опрашивать некоторые переменные.
Мотивация использования наблюдателя () заключается в том, что обычно значения имеют несколько наблюдателей, и я хотел бы, чтобы каждый наблюдатель работал одновременно. Возможно, есть разные способы достижения одной и той же цели, я открыт для всех ваших предложений.
Слияние двух позволит одной подписке блокировать ждать окончания обоих.
{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
s.on_next(2);
// ...
s.on_completed();
};
auto values = rxcpp::observable<>::create<int>(generator).publish();
auto work = values.
observe_on(rxcpp::observe_on_new_thread()).
tap([&](int c) {
slow_function(foo);
}).
finally([](){printf("s1 completed\n");}).
as_dynamic();
auto start = values.
ref_count().
finally([](){printf("s2 completed\n");}).
as_dynamic();
// wait for all to finish
rxcpp::observable<>::from(work, start).
merge(rxcpp::observe_on_new_thread()).
as_blocking().subscribe();
}
Несколько баллов.
поток должен возвращать тот же тип, чтобы слияние работало. если вы объединяете потоки разных типов, используйте вместо этого comb_latest.
порядок наблюдаемых в наблюдаемых<> :: from () важен, стартовый поток имеет ref_count, поэтому он должен быть вызван последним, чтобы следующее слияние подписалось на работу перед запуском генератора.
У слияния есть два потока, вызывающих его. Это требует использования поточно-ориентированной координации. rxcpp является платным. по умолчанию операторы предполагают, что все вызовы поступают из одного потока. любой оператор, который получает вызовы из нескольких потоков, должен получить поточно-ориентированную координацию, которую оператор использует для навязывания потокобезопасного управления состоянием и выходных вызовов.
При желании один и тот же экземпляр координатора может быть использован для обоих.