Цель следующего кода состоит в том, чтобы различные классы публиковали данные в наблюдаемой. Некоторые классы будут наблюдать все данные, некоторые будут периодически наблюдать с помощью buffer_with_time ().
Это работает хорошо до тех пор, пока программа не выйдет, а затем завершится сбоем, возможно, потому что наблюдатель, использующий buffer_with_time (), все еще висит в каком-то потоке.
struct Data
{
Data() : _subscriber(_subject.get_subscriber()) { }
~Data() { _subscriber.on_completed(); }
void publish(std::string data) { _subscriber.on_next(data); }
rxcpp::observable<std::string> observable() { return _subject.get_observable(); }
private:
rxcpp::subjects::subject<std::string> _subject;
rxcpp::subscriber<std::string> _subscriber;
};
void foo()
{
Data data;
auto period = std::chrono::milliseconds(30);
auto s1 = data.observable()
.buffer_with_time(period , rxcpp::observe_on_new_thread())
.subscribe([](std::vector<std::string>& data)
{ std::cout << data.size() << std::endl; });
data.publish("test 1");
data.publish("test 2");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// hope to call something here so s1's thread can be joined.
// program crashes upon exit
}
Я попытался вызвать «s1.unsubscribe ()» и различные as_blocking (), from (), merge (), но все еще не могу заставить программу корректно завершить работу.
Обратите внимание, что я использовал здесь «предметы», потому что «публикация» может быть вызвана из разных мест (которые могут быть из разных тем). Я не уверен, что это лучший механизм для этого, я открыт для других способов сделать это.
Совет?
Это очень близко к работе ..
Однако деструктор Data завершает ввод, а также хочет, чтобы подписка блокировала выход foo до завершения ввода, что усложняет задачу.
Вот способ убедиться, что блоки foo после разрушения данных. Это использует существующий договор на данные.
void foo1()
{
rxcpp::observable<std::vector<std::string>> buffered;
{
Data data;
auto period = std::chrono::milliseconds(30);
buffered = data.observable()
.buffer_with_time(period , rxcpp::observe_on_new_thread())
.publish().ref_count();
buffered
.subscribe([](const std::vector<std::string>& data)
{ printf("%lu\n", data.size()); },
[](){printf("data complete\n");});
data.publish("test 1");
data.publish("test 2");
// hope to call something here so s1's thread can be joined.
// program crashes upon exit
}
buffered.as_blocking().subscribe();
printf("exit foo1\n");
}
Кроме того, изменение формы данных (добавить полный метод) позволит следующий код:
struct Data
{
Data() : _subscriber(_subject.get_subscriber()) { }
~Data() { complete(); }
void publish(std::string data) { _subscriber.on_next(data); }
void complete() {_subscriber.on_completed();}
rxcpp::observable<std::string> observable() { return _subject.get_observable(); }
private:
rxcpp::subjects::subject<std::string> _subject;
rxcpp::subscriber<std::string> _subscriber;
};
void foo2()
{
printf("foo2\n");
Data data;
auto newthread = rxcpp::observe_on_new_thread();
auto period = std::chrono::milliseconds(30);
auto buffered = data.observable()
.buffer_with_time(period , newthread)
.tap([](const std::vector<std::string>& data)
{ printf("%lu\n", data.size()); },
[](){printf("data complete\n");});
auto emitter = rxcpp::sources::timer(std::chrono::milliseconds(0), newthread)
.tap([&](long) {
data.publish("test 1");
data.publish("test 2");
data.complete();
});
// hope to call something here so s1's thread can be joined.
// program crashes upon exit
buffered.combine_latest(newthread, emitter).as_blocking().subscribe();
printf("exit foo2\n");
}
Я думаю, что это лучше выражает зависимости ..