Планирование и обработка тайм-аута с помощью rxcpp

Я новичок в использовании rxcpp и пытаюсь собрать что-то функциональное вместе в следующем сценарии:

У меня есть один источник данных, который будет извлекать команды из отдельного источника, код, который я пишу, будет извлекать эти команды в наблюдаемый rxcpp. Особое условие заключается в том, что если в течение определенного периода времени не было получено ни одной команды, функция onError подписчиков будет запущена вместо onNext, но время ожидания может наступить только до получения первой команды. После получения первой команды тайм-аут не может произойти, независимо от того, сколько времени потребуется для получения каких-либо дальнейших команд.

Я пытаюсь сделать это примерно так:

auto timeout = rxcpp::observable<>::timer(std::chrono::steady_clock::now() + timeout,
rxcpp::observe_on_event_loop()).map([](int val) // Note, converts the value type of the timer observable and converts timeouts to errors
{
std::cout << "TIMED OUT!" << std::endl;
throw std::runtime_error("timeout");
return command_type();
});
auto commands = timeout.amb(rxcpp::observe_on_event_loop(), createCommandSource(event_loop_scheduler, ...));

У меня проблема в том, что тайм-аут происходит до того, как какие-либо команды будут получены, даже если они вставляются задолго до того, как истечет тайм-аут. Я экспериментировал с тайм-аутами от 1000 мс до 5000 мс, и это не имеет значения. Если я удалю код тайм-аута, команда получится немедленно, однако. Я подозреваю, что, скорее всего, я просто неправильно понял, как использовать планировщики в rxcpp, поэтому мне интересно, как это можно сделать.

2

Решение

Я написал простой createCommandSource. Это сработало для меня:

#include "rxcpp/rx.hpp"using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::util;

using namespace std;

struct command_type {};

int main()
{
auto eventloop = rxcpp::observe_on_event_loop();
auto createCommandSource = [=]() {
return rxcpp::observable<>::interval(std::chrono::seconds(1), eventloop).map([](long) {return command_type(); });
};
auto timeout = rxcpp::observable<>::timer(eventloop.now() + std::chrono::seconds(2), eventloop).map([](long ) // Note, converts the value type of the timer observable and converts timeouts to errors
{
std::cout << "TIMED OUT!" << std::endl;
throw std::runtime_error("timeout");
return command_type();
});
auto commands = timeout.amb(eventloop, createCommandSource().take(5));

commands
.as_blocking().subscribe(
[](command_type) {printf("command\n"); },
[](std::exception_ptr) {printf("execption\n"); });

std::this_thread::sleep_for(std::chrono::seconds(2));

return 0;
}
2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]