如何在 Rx.NET 中组合两个不同的 GroupedStreams?

如何解决如何在 Rx.NET 中组合两个不同的 GroupedStreams?

This question 类似,但不适用于我的情况,因为用户需要合并来自同一个 IGroupedobservable 的可观察流,而我想合并来自不同组的流。

我有以下结构和流:

type A = {
  Id: int
  Value: int
}

type B = {
  Id: int
  Value: int
}

//subjects to test input,just any source of As and Bs
let subjectA: Subject<A> = Subject.broadcast
let subjectB: Subject<B> = Subject.broadcast

//grouped streams
let groupedA: IObservable<<IGroupedobservable<int,A>> = Observable.groupBy (fun a -> a.Id) subjectA
let groupedB: IObservable<<IGroupedobservable<int,B>> = Observable.groupBy (fun b -> b.Id) subjectB

当 groupedA.Key = groupedB.Key 时,我想以某种方式合并 A 和 B 的内部可观察值,并获得 (A,B) 对的可观察值,其中 A.Id = B.Id

我想要的签名是这样的 IObservable<IGroupedobservable<int,A>> -> IObservable<IGroupedobservable<int,B>> -> IObservable<IGroupedobservable<int,(A,B)>> where for all (A,B),A.Id = B.Id

我尝试了很多 combineLatest、groupJoin、过滤器和地图变体,但都没有成功。

我将 F# 与 Rx.Net 和 FSharp.Control.Reactive 一起使用,但如果您知道 C#(或任何语言,真的)的答案,请发布

解决方法

这是您可以使用的自定义运算符 GroupJoin。它基于 SelectMergeGroupByWhere 运算符:

/// <summary>
/// Groups and joins the elements of two observable sequences,based on common keys.
/// </summary>
public static IObservable<(TKey Key,IObservable<TLeft> Left,IObservable<TRight> Right)>
    GroupJoin<TLeft,TRight,TKey>(
    this IObservable<TLeft> left,IObservable<TRight> right,Func<TLeft,TKey> leftKeySelector,Func<TRight,TKey> rightKeySelector,IEqualityComparer<TKey> keyComparer = null)
{
    // Arguments validation omitted
    keyComparer ??= EqualityComparer<TKey>.Default;
    return left
        .Select(x => (x,(TRight)default,Type: 1,Key: leftKeySelector(x)))
        .Merge(right.Select(x => ((TLeft)default,x,Type: 2,Key: rightKeySelector(x))))
        .GroupBy(e => e.Key,keyComparer)
        .Select(g => (
            g.Key,g.Where(e => e.Type == 1).Select(e => e.Item1),g.Where(e => e.Type == 2).Select(e => e.Item2)
        ));
}

用法示例:

var subjectA = new Subject<A>();
var subjectB = new Subject<B>();

IObservable<IGroupedObservable<int,(A,B)>> query = subjectA
    .GroupJoin(subjectB,a => a.Id,b => b.Id)
    .SelectMany(g => g.Left.Zip(g.Right,(a,b) => (g.Key,a,b)))
    .GroupBy(e => e.Key,e => (e.a,e.b));
,

我不清楚这是否是您想要的。因此,首先使用运行器代码进行澄清可能会有所帮助。假设以下运行程序代码:

var aSubject = new Subject<A>();
var bSubject = new Subject<B>();

var groupedA = aSubject.GroupBy(a => a.Id);
var groupedB = bSubject.GroupBy(b => b.Id);

//Initiate solution

solution.Merge()
    .Subscribe(t => Console.WriteLine($"(Id = {t.a.Id},AValue = {t.a.Value},BValue = {t.b.Value}  )"));

aSubject.OnNext(new A() { Id = 1,Value = 1 });
aSubject.OnNext(new A() { Id = 1,Value = 2 });

bSubject.OnNext(new B() { Id = 1,Value = 10 });
bSubject.OnNext(new B() { Id = 1,Value = 20 });
bSubject.OnNext(new B() { Id = 1,Value = 30 });

您是否想看到以下输出:

(Id = 1,AValue = 1,BValue = 10)
(Id = 1,AValue = 2,BValue = 20)
(Id = 1,BValue = 30)
(Id = 1,BValue = 30)

如果是这种情况,您可以通过以下方式获得解决方案:

var solution = groupedA.Merge()
    .Join(groupedB.Merge(),_ => Observable.Never<Unit>(),b) => (a,b)
    )
    .Where(t => t.a.Id == t.b.Id)
    .GroupBy(g => g.a.Id);

如果这是一个长期运行的进程的一部分,我会警告这里会有内存/性能影响。这会将所有 AB 对象无限期地保留在内存中,等待查看它们是否可以配对。要缩短它们在内存中的保留时间,请将 Observable.Never() 调用更改为适当的窗口,以了解每个对象在内存中的保留时间。

,

首先,这有你想要的签名:

let cartesian left right =
    rxquery {
        for a in left do
        for b in right do
        yield a,b
    }

let mergeGroups left right =
    rxquery {
        for (leftGroup : IGroupedObservable<'key,'a>) in left do
        for (rightGroup : IGroupedObservable<'key,'b>) in right do
        if leftGroup.Key = rightGroup.Key then
            let merged = cartesian leftGroup rightGroup
            yield {
                new IGroupedObservable<_,_> with
                    member __.Key = leftGroup.Key
                    member __.Subscribe(observer) = merged.Subscribe(observer)
            }
    }

然而,在我的测试中,这些组都是空的。我没有足够的 Rx 经验不知道为什么,但也许其他人知道。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?