rxcpp — почему не вызывается функция on_next всех наблюдателей, когда наблюдаемое излучает значение

Я пытаюсь понять, как использовать rxcpp, у меня сложилось впечатление, что, когда наблюдаемое излучает значение, все подписчики получают уведомление, вызывая свои методы on_next (), передавая им излученное значение.

Это не так со следующим примером:

auto eventloop = rxcpp::observe_on_event_loop();

printf("Start task\n");

auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
[](int i){
printf("Observable sending: %d\n", i);
return i;
}
);

values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#1 onNext: %d\n", v);},
[](){printf("#1 onCompleted\n");});

values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#2 onNext: %d\n", v);},
[](){printf("#2 onCompleted\n");});

printf("Finish task\n");

Я ожидал, что результат будет примерно таким:

Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task

то есть on_next вызывается на всех подписанных наблюдателях, когда приходит новое значение.

Вместо этого результат на самом деле:

Start task
Observable sending: 1
#1 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
Observable sending: 1
#2 onNext: 1
Observable sending: 2
#2 onNext: 2
#2 onCompleted
Finish task

3

Решение

Это классическое поведение против горячего и холодного.

Горячая наблюдаемая будет делать, как вы ожидаете. Интервал является холодной наблюдаемой, поэтому каждая подписка создает независимый набор значений.

Оператор публикации возьмет одну наблюдаемую в холодном состоянии и поделится ею как наблюдаемой в горячем состоянии.

В этом случае это будет.

auto sharedvalues = values.publish().ref_count();

Тогда используйте sharedvalues вместо values в выражениях подписки.

Поиск горячих и холодных наблюдаемых найдет широкое обсуждение этой темы.

2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]