RxCpp: как контролировать время жизни субъекта-наблюдателя при использовании с buffer_with_time

Цель следующего кода состоит в том, чтобы различные классы публиковали данные в наблюдаемой. Некоторые классы будут наблюдать все данные, некоторые будут периодически наблюдать с помощью buffer_with_time ().

Это работает хорошо до тех пор, пока программа не выйдет, а затем завершится сбоем, возможно, потому что наблюдатель, использующий buffer_with_time (), все еще висит в каком-то потоке.

struct Data
{
Data() : _subscriber(_subject.get_subscriber()) { }
~Data()  { _subscriber.on_completed(); }

void publish(std::string data) { _subscriber.on_next(data); }
rxcpp::observable<std::string> observable() { return _subject.get_observable(); }

private:
rxcpp::subjects::subject<std::string> _subject;
rxcpp::subscriber<std::string> _subscriber;
};

void foo()
{
Data data;

auto period = std::chrono::milliseconds(30);
auto s1 = data.observable()
.buffer_with_time(period , rxcpp::observe_on_new_thread())
.subscribe([](std::vector<std::string>& data)
{ std::cout << data.size() << std::endl; });

data.publish("test 1");
data.publish("test 2");
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// hope to call something here so s1's thread can be joined.
// program crashes upon exit
}

Я попытался вызвать «s1.unsubscribe ()» и различные as_blocking (), from (), merge (), но все еще не могу заставить программу корректно завершить работу.

Обратите внимание, что я использовал здесь «предметы», потому что «публикация» может быть вызвана из разных мест (которые могут быть из разных тем). Я не уверен, что это лучший механизм для этого, я открыт для других способов сделать это.

Совет?

1

Решение

Это очень близко к работе ..

Однако деструктор Data завершает ввод, а также хочет, чтобы подписка блокировала выход foo до завершения ввода, что усложняет задачу.

Вот способ убедиться, что блоки foo после разрушения данных. Это использует существующий договор на данные.

void foo1()
{
rxcpp::observable<std::vector<std::string>> buffered;
{
Data data;

auto period = std::chrono::milliseconds(30);
buffered = data.observable()
.buffer_with_time(period , rxcpp::observe_on_new_thread())
.publish().ref_count();

buffered
.subscribe([](const std::vector<std::string>& data)
{ printf("%lu\n", data.size()); },
[](){printf("data complete\n");});

data.publish("test 1");
data.publish("test 2");

// hope to call something here so s1's thread can be joined.
// program crashes upon exit
}
buffered.as_blocking().subscribe();

printf("exit foo1\n");
}

Кроме того, изменение формы данных (добавить полный метод) позволит следующий код:

struct Data
{
Data() : _subscriber(_subject.get_subscriber()) { }
~Data()  { complete(); }

void publish(std::string data) { _subscriber.on_next(data); }
void complete() {_subscriber.on_completed();}
rxcpp::observable<std::string> observable() { return _subject.get_observable(); }

private:
rxcpp::subjects::subject<std::string> _subject;
rxcpp::subscriber<std::string> _subscriber;
};

void foo2()
{
printf("foo2\n");

Data data;

auto newthread = rxcpp::observe_on_new_thread();

auto period = std::chrono::milliseconds(30);
auto buffered = data.observable()
.buffer_with_time(period , newthread)
.tap([](const std::vector<std::string>& data)
{ printf("%lu\n", data.size()); },
[](){printf("data complete\n");});

auto emitter = rxcpp::sources::timer(std::chrono::milliseconds(0), newthread)
.tap([&](long) {
data.publish("test 1");
data.publish("test 2");
data.complete();
});

// hope to call something here so s1's thread can be joined.
// program crashes upon exit
buffered.combine_latest(newthread, emitter).as_blocking().subscribe();

printf("exit foo2\n");
}

Я думаю, что это лучше выражает зависимости ..

1

Другие решения


По вопросам рекламы [email protected]