У меня есть устройство, которое передает некоторые события. Я хочу использовать реактивные расширения для моделирования следующего поведения:
Я знаю, как ждать подключения ключа (1.):
auto waitForDongle = events.take_while([](auto const & event) {
return event == DongleConnected
}).subscribe([](auto) {});
И я знаю, как захватить поток (2.):
auto streamMotionData = events.take_while([](auto const &) { return !keyboardPressed(); })
.map([](auto const & evt) -> boost::optional<std::vector<double>> {
...
return data;
}).subscribe([](vector<double> const &) { ...});
Моя проблема в том, что я не знаю, как объединить потоки, чтобы вернуться к 1. и позже 2. Я просто знаю, как сделать это один раз и последовательно. Но я хочу поведение, описанное выше.
Это связано с распространенным примером перетаскивания UX в Rx. В этом случае пресс представляет собой соединение с ключом, а выпуск — это удаление ключа.
Это решение требует, чтобы только один ключ мог быть подключен за раз (удаление ожидается до следующего подключения). Там должно быть больше информации в случае, чтобы позволить нескольким соединениям перекрываться.
Вот суть решения. Вся программа ниже.
auto DatasFromConnectedDongle = DongleConnects. // when connected
map([=](DongleEvent const & event){
assert(event == DongleEvent::Connected);
cout << "Connected - " << flush;
return DongleDatas. // return all the datas
take_until(DongleRemoves). // stop when removed
finally([](){
cout << "- Removed" << endl;
});
}).
switch_on_next(). // only listen to datas from the most recent connected dongle
take_until(Exits); // stop everything when key is pressed
Я в конечном итоге с помощью repeat()
, но только для получения данных тестового события.
#include <rxcpp/rx.hpp>
namespace Rx {
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;
}
using namespace Rx;
#include <cassert>
using namespace std;
using namespace std::chrono;
int main()
{
//
// test code
//
auto keyboardPressed = [](){
return false;
};
enum class DongleEvent {
Connected,
Removed,
Data,
Other
};
auto events = from(
DongleEvent::Data, DongleEvent::Other,
DongleEvent::Connected, DongleEvent::Data,
DongleEvent::Other, DongleEvent::Other,
DongleEvent::Data, DongleEvent::Removed,
DongleEvent::Other, DongleEvent::Data).
repeat(5). // send the above events five times over
zip(take_at<0>(), interval(milliseconds(200))). // pace our test data
publish().
ref_count(); // publish and ref_count make the events sharable
//
// the solution
//
// fires when connected
auto DongleConnects = events.
filter([](DongleEvent const & event) {
return event == DongleEvent::Connected;
});
// fires when data arrives
auto DongleDatas = events.
filter([](DongleEvent const & event) {
return event == DongleEvent::Data;
});
// fires when removed
auto DongleRemoves = events.
filter([](DongleEvent const & event) {
return event == DongleEvent::Removed;
});
// fires when key pressed
auto Exits = interval(milliseconds(200)).
filter([=](long){
return keyboardPressed();
});
auto DatasFromConnectedDongle = DongleConnects.
map([=](DongleEvent const & event){
assert(event == DongleEvent::Connected);
cout << "Connected - " << flush;
return DongleDatas. // return all the datas
take_until(DongleRemoves). // stop when removed
finally([](){
cout << "- Removed" << endl;
});
}).
switch_on_next(). // only listen to datas from the most recent connected dongle
take_until(Exits); // stop everything when key is pressed
DatasFromConnectedDongle.subscribe([](DongleEvent const & event){
assert(event == DongleEvent::Data);
cout << "Data " << flush;
});
return 0;
}
производит
~/source/rxcpp/Rx/v2/examples/dongle (master)$ cmake .
...
~/source/rxcpp/Rx/v2/examples/dongle (master)$ make
Scanning dependencies of target dongle
[ 50%] Building CXX object CMakeFiles/dongle.dir/main.cpp.o
[100%] Linking CXX executable dongle
[100%] Built target dongle
~/source/rxcpp/Rx/v2/examples/dongle (master)$ ./dongle
Connected - Data Data - Removed
Connected - Data Data - Removed
Connected - Data Data - Removed
Connected - Data Data - Removed
Connected - Data Data - Removed
~/source/rxcpp/Rx/v2/examples/dongle (master)$
Других решений пока нет …