Я использую RxCPP и испытываю трудности с пониманием его поведения.
Вот две программы, одна в Rx.Net и другая в RxCPP.
Они предполагают выводить одинаковые отпечатки, но это не так.
программа берет точки из потока мыши и вычисляет поток дельт между точками.
мышь — это поток из потока точек, каждый штрих —
сверху вниз нажатие — это один поток. мышь выдает такие потоки один за другим.
В этих тестах ожидаемый результат:
Дельта № 0 составляет: 0,0
Дельта № 1 составляет: 5,0
Дельта № 2 составляет: 0,5
Дельта № 3 составляет: 2,3
Который является тем, что выводит Rx.Net.
Rx.Cpp выводит только первую строку: Дельта № 0 составляет: 0,0
Любая идея?
Пример Rx.Cpp:
#include <rx.hpp>
namespace rx = rxcpp;
namespace rxsub = rxcpp::subjects;
using rxob = rx::observable<>;
struct Point
{
Point(int x, int y) : x(x), y(y) {}
int x = 0, y = 0;
Point operator-() const { return {-x, -y}; }
Point operator+(const Point& other) const { return Point{x + other.x, y + other.y}; }
Point operator-(const Point& other) const { return operator+(-other); }
};
std::ostream& operator<<(std::ostream& o, const Point& p)
{
return o << "(" << p.x << ", " << p.y << ")";
}
void TestRxCPP()
{
using RxPoint = rx::observable<Point>;
using Strokes = rx::observable<RxPoint>;
using StrokesSubject = rxsub::subject<RxPoint>;
StrokesSubject mouseSource;
auto strokes = mouseSource.get_observable();
auto deltaVectors = [](Strokes strokes) {
auto deltas = strokes.flat_map([=](RxPoint stroke) {
auto points = stroke;
// create stream of delta vectors from start point
auto firstPoint = points.take(1);
auto delta =
points.combine_latest([](Point v0, Point v1) { return v0 - v1; }, firstPoint);
return delta;
});
return deltas;
};
auto delta = deltaVectors(strokes);
int n = 0;
delta.subscribe(
[&](const Point& d) { std::cout << "Delta no. " << n++ << " is: " << d << std::endl; });
auto testMouse = rxob::from(Point{3 + 0, 4 + 0}, Point{3 + 5, 4 + 0}, Point{3 + 0, 4 + 5}, Point{3 + 2, 4 + 3});
mouseSource.get_subscriber().on_next(testMouse);
}
Пример Rx.Net:
void RxNET()
{
var strokesS = new Subject<IObservable<Point>>();
Func<IObservable<IObservable<Point>>, IObservable<Point>>
deltaVectors = strokes =>
{
var deltas = strokes.SelectMany(stroke =>
{
var points = stroke;
// create stream of delta vectors from start point
var firstPoint = points.Take(1);
var deltaP =
points.CombineLatest(firstPoint, (v0, v1) => new Point(v0.X - v1.X, v0.Y - v1.Y));
return deltaP;
});
return deltas;
};
var delta = deltaVectors(strokesS);
var n = 0;
delta.Subscribe(d => { Console.WriteLine($"Delta no {n++} is: {d}\n"); });
var testMouse = new List<Point>
{
new Point(3 + 0, 4 + 0),
new Point(3 + 5, 4 + 0),
new Point(3 + 0, 4 + 5),
new Point(3 + 2, 4 + 3)
}.ToObservable();
strokesS.OnNext(testMouse);
}
Спасибо @Kirk Shoop на rxcpp github 🙂
это поведение HOTVCOLD.
штрихи ХОЛОДНЫЕ и используются совместно, и используется только один поток. points.combine_latest(..., firstPoint)
означает, что все очки отправляются раньше firstPoint
подписан таким образом испускается только последняя дельта.
ХОЛОДНЫЕ и ГОРЯЧИЕ источники будут работать, если вы измените combine_latest
auto delta =
firstPoint.combine_latest([](Point v0, Point v1) { return v1 - v0; }, points);
Других решений пока нет …