RxCpp: время жизни наблюдателя, если использовать наблюдающая (rxcpp :: наблюдаемая_нон_руда ())

Как правильно подождать, пока все наблюдатели on_completed будут вызваны, если наблюдатели будут использовать наблюдаемую (rxcpp :: наблюдаемую_показу_наследования ()):

Например:

{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
// ...
s.on_completed();
};
auto values = rxcpp::observable<>::create<int>(generator).publish();
auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
.subscribe([&](int) { slow_function(foo); }));

auto lifetime = rxcpp::composite_subscription();
lifetime.add([&](){ wrapper.log("unsubscribe");  });
auto s2 = values.ref_count().as_blocking().subscribe(lifetime);

// hope to call something here to wait for the completion of
// s1's on_completed function
}

// the program usually crashes here when foo goes out of scope because
// the slow_function(foo) is still working on foo.  I also noticed that
// s1's on_completed never got called.

Мой вопрос состоит в том, как ждать, пока s1 on_completed не будет завершен, без необходимости устанавливать и опрашивать некоторые переменные.

Мотивация использования наблюдателя () заключается в том, что обычно значения имеют несколько наблюдателей, и я хотел бы, чтобы каждый наблюдатель работал одновременно. Возможно, есть разные способы достижения одной и той же цели, я открыт для всех ваших предложений.

4

Решение

Слияние двух позволит одной подписке блокировать ждать окончания обоих.

{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
s.on_next(2);
// ...
s.on_completed();
};

auto values = rxcpp::observable<>::create<int>(generator).publish();

auto work = values.
observe_on(rxcpp::observe_on_new_thread()).
tap([&](int c) {
slow_function(foo);
}).
finally([](){printf("s1 completed\n");}).
as_dynamic();

auto start = values.
ref_count().
finally([](){printf("s2 completed\n");}).
as_dynamic();

// wait for all to finish
rxcpp::observable<>::from(work, start).
merge(rxcpp::observe_on_new_thread()).
as_blocking().subscribe();
}

Несколько баллов.

поток должен возвращать тот же тип, чтобы слияние работало. если вы объединяете потоки разных типов, используйте вместо этого comb_latest.

порядок наблюдаемых в наблюдаемых<> :: from () важен, стартовый поток имеет ref_count, поэтому он должен быть вызван последним, чтобы следующее слияние подписалось на работу перед запуском генератора.

У слияния есть два потока, вызывающих его. Это требует использования поточно-ориентированной координации. rxcpp является платным. по умолчанию операторы предполагают, что все вызовы поступают из одного потока. любой оператор, который получает вызовы из нескольких потоков, должен получить поточно-ориентированную координацию, которую оператор использует для навязывания потокобезопасного управления состоянием и выходных вызовов.

При желании один и тот же экземпляр координатора может быть использован для обоих.

3

Другие решения


По вопросам рекламы [email protected]