Я портирую некоторый код из C #, который в значительной степени зависит от Rx, и у меня возникают трудности с поиском эквивалентов C ++ для некоторых из наиболее используемых методов C #.
В частности, я хочу создать наблюдаемую из логики подписки / отписки. В C # я использую Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>)
переопределить, чтобы создать наблюдаемую. Например
var observable = Observable.Create<int>(observer =>
{
observers.Add(observer);
return () =>
{
observers.Remove(observer)
};
});
Можно ли сделать то же самое с RxCpp? Я думаю, что ответ лежит в rx::observable<>::create(OnSubscribe os)
метод, но я не могу понять, как использовать его, чтобы «зарегистрировать» лямбда отписки.
В RxCpp и RxJava .subscribe () принимает подписчика. Подписчик — это подписка и наблюдатель, связанные друг с другом.
В RxCpp ваш пример может выглядеть так:
std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers(new std::list<rxcpp::subscriber<int>>());
auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
auto it = observers->insert(observers->end(), out);
it->add([=](){
observers->erase(it);
});
});
НОТА: rxcpp::subscriber<int>
это тип забывающий, который скрывает тип наблюдателя. Это позволяет хранить его в коллекции, но вводит виртуальные функции для on_next, on_error и on_completed.