Я пытаюсь понять, как использовать rxcpp, у меня сложилось впечатление, что, когда наблюдаемое излучает значение, все подписчики получают уведомление, вызывая свои методы on_next (), передавая им излученное значение.
Это не так со следующим примером:
auto eventloop = rxcpp::observe_on_event_loop();
printf("Start task\n");
auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
[](int i){
printf("Observable sending: %d\n", i);
return i;
}
);
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#1 onNext: %d\n", v);},
[](){printf("#1 onCompleted\n");});
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#2 onNext: %d\n", v);},
[](){printf("#2 onCompleted\n");});
printf("Finish task\n");
Я ожидал, что результат будет примерно таким:
Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task
то есть on_next вызывается на всех подписанных наблюдателях, когда приходит новое значение.
Вместо этого результат на самом деле:
Start task
Observable sending: 1
#1 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
Observable sending: 1
#2 onNext: 1
Observable sending: 2
#2 onNext: 2
#2 onCompleted
Finish task
Это классическое поведение против горячего и холодного.
Горячая наблюдаемая будет делать, как вы ожидаете. Интервал является холодной наблюдаемой, поэтому каждая подписка создает независимый набор значений.
Оператор публикации возьмет одну наблюдаемую в холодном состоянии и поделится ею как наблюдаемой в горячем состоянии.
В этом случае это будет.
auto sharedvalues = values.publish().ref_count();
Тогда используйте sharedvalues
вместо values
в выражениях подписки.
Поиск горячих и холодных наблюдаемых найдет широкое обсуждение этой темы.
Других решений пока нет …