如何合并具有有限并发和有限缓冲区容量的嵌套可观察 IObservable<IObservable<T>>?

如何解决如何合并具有有限并发和有限缓冲区容量的嵌套可观察 IObservable<IObservable<T>>?

我注意到 Rx Merge 运算符接受一个可选的 maxConcurrent 参数。这可用于通过同时订阅有限数量的子序列来限制最大并发。当新子序列的推送速度低于订阅子序列的完成速度时,它可以完美地工作,但是当新子序列的推送速度比这个速度快时,它就会出现问题。发生的情况是子序列被缓冲在一个内部缓冲区中,其大小永远增加,而且当前订阅的子序列变得越来越老。下面是这个问题的演示:

await Observable
    .Generate(0,_ => true,x => x,_ => TimeSpan.FromMilliseconds(10))
    .Select(_ => Observable
        .Return(DateTime.Now)
        .Do(d => Console.WriteLine(
            $"Then: {d:HH:mm:ss.fff}," +
            $"Now: {DateTime.Now:HH:mm:ss.fff}," +
            $"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
        .Delay(TimeSpan.FromMilliseconds(1000)))
    .Merge(maxConcurrent: 1)
    .Take(10);

每 10 毫秒推送一个新子序列,每个子序列在 1000 毫秒后完成。子序列以最大并发1(顺序)合并。

输出

Then: 12:45:34.019,Now: 12:45:34.054,TotalMemory: 117,040 bytes
Then: 12:45:34.082,Now: 12:45:35.088,TotalMemory: 139,336 bytes
Then: 12:45:34.093,Now: 12:45:36.094,TotalMemory: 146,336 bytes
Then: 12:45:34.114,Now: 12:45:37.098,TotalMemory: 153,216 bytes
Then: 12:45:34.124,Now: 12:45:38.109,TotalMemory: 159,272 bytes
Then: 12:45:34.145,Now: 12:45:39.126,TotalMemory: 167,608 bytes
Then: 12:45:34.156,Now: 12:45:40.141,TotalMemory: 173,952 bytes
Then: 12:45:34.177,Now: 12:45:41.147,TotalMemory: 180,432 bytes
Then: 12:45:34.188,Now: 12:45:42.164,TotalMemory: 186,808 bytes
Then: 12:45:34.209,Now: 12:45:43.175,TotalMemory: 197,208 bytes

(Try it on Fiddle)

内存使用量稳步增长,每个子序列的创建和订阅间的时间差距也越来越大。

我想要的是一个自定义Merge 变体,它有一个大小有限的内部缓冲区。当缓冲区已满时,任何传入子序列都应导致当前最旧的缓冲子序列被丢弃。这是理想行为的弹珠图,配置为最大并发数 = 1 和缓冲区容量 = 1:

Source: +----A------B------C------|
A:           +-------a----a---|
B:                  not-subscribed
C:                            +-----c----|
Result: +------------a----a---------c----|
  • 子序列 A 在发出后立即被订阅
  • 然后 B 被发射并存储在缓冲区中,因为 A 还没有完成。
  • 然后发出 C 并替换缓冲区中的 B。结果 B 子序列被丢弃并且从未被订阅
  • 在子序列 A 完成之后,立即订阅了缓冲的子序列 C。
  • 最终结果包含 A 和 C 子序列发出的合并值。

如何实现具有这种特定行为的自定义 Rx 运算符?这是我试图实现的运算符的存根:

public static IObservable<T> MergeBounded<T>(
    this IObservable<IObservable<T>> source,int maximumConcurrency,int boundedCapacity)
{
    return source.Merge(maximumConcurrency);
    // Todo: enforce the boundedCapacity policy somehow
}

解决方法

我想出了一个功能性的解决方案,我不确定它是否可行,只是因为复杂。但我想我涵盖了所有的基础。

首先,如果采用函数式方法,这是一个相对简单的状态机问题:状态需要知道当前正在执行多少个 observable 以及缓冲区队列。可以影响状态的两个事件是一个新的 Observable 进入缓冲区队列(导致缓冲区队列入队),或者当前正在执行的 observable 终止(导致缓冲区队列出队)。

由于状态机基本上意味着 Scan,而 Scan 只能处理一种类型,因此我们必须将我们的两个事件强制转换为一种类型,我在下面将其称为 Message .然后状态机知道所有并可以完成 Merge(n) 重载的工作。

最后一个技巧是回送:由于完成的 Observable 是 Scan 的“下游”,我们需要将该 observable 的终止“回送”到 Scan。为此,我总是参考 [this answer][1] 中的 Drain 函数。

public static class X
{
    public static IObservable<T> MergeBounded<T>(
        this IObservable<IObservable<T>> source,int maximumConcurrency,int boundedCapacity)
    {
        return Observable.Defer(() =>
        {
            var capacityQueue = new Subject<Unit>();

            var toReturn = source.Publish(_source => _source
                .Select(o => Message.Enqueue(o))
                .Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
                .Scan((bufferCount: 0,buffer: ImmutableQueue<IObservable<T>>.Empty,executionCount: 0,item: (IObservable<T>)null),(state,message) =>
                {
                    var buffer = state.buffer;
                    var bufferCount = state.bufferCount;
                    var executionCount = state.executionCount;
                    if (message.IsEnqueue)
                    {
                        if (executionCount < maximumConcurrency)
                            return (0,ImmutableQueue<IObservable<T>>.Empty,executionCount + 1,message.Object);

                        buffer = buffer.Enqueue(message.Object);
                        if (bufferCount == boundedCapacity)
                            buffer = buffer.Dequeue();
                        else
                            bufferCount++;
                        return (bufferCount,buffer,executionCount,null);
                    }
                    else
                    {
                        if (bufferCount == 0)
                            return (0,executionCount - 1,null);
                        else
                            return (bufferCount - 1,buffer.Dequeue(),buffer.Peek());
                    }
                })
                .Where(t => t.item != null)
                .Select(t => t.item)
                .Select(o => o.Do(_ => { },() => capacityQueue.OnNext(Unit.Default)))
                .TakeUntil(_source.IgnoreElements().Materialize())
                .Merge()
            );

            return toReturn;
        });

    }

    public class Message
    {
        public static Message<T> Enqueue<T>(T t)
        {
            return Message<T>.Enqueue(t);
        }

        public static Message<T> Dequeue<T>(T t)
        {
            return Message<T>.Dequeue(t);
        }

    }

    public class Message<T>
    {
        private readonly T _t;
        private readonly bool _isEnqueue;
        private Message(bool isEnqueue,T t)
        {
            _t = t;
            _isEnqueue = isEnqueue;
        }
        
        public static Message<T> Enqueue(T t)
        {
            return new Message<T>(true,t);
        }

        public static Message<T> Dequeue(T t)
        {
            return new Message<T>(false,t);
        }
        
        public bool IsEnqueue => _isEnqueue;
        public T Object => _t;
    }
}

我写了一些测试代码(基于原始问题)来验证,如果你想利用它。测试通过:

//              T: 0123456789012345678901234567890123
//            T10: 0         1         2         3
//         Source: +----A------B------C------|
//              A:      +-------a----a---|
//              B:             +----------b----b---|
//              C:                    +--------c----|
// ExpectedResult: +------------a----a---------c----|


var ts = new TestScheduler();

var A = ts.CreateHotObservable(
    ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond,"a"),ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond,ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
    ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond,"b"),ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond,ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond,"c"),ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
    ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond,A.AsObservable()),ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond,B.AsObservable()),ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond,C.AsObservable()),ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1,1);
testResult.Subscribe(observer);

var expected = ts.CreateHotObservable(
    ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond,ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual");   // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages,observer.Messages);

(测试代码无一例外通过)

,

这是另一个实现。它不像 Shlomo 的 solution 那样功能完备,因为它不能用 navigator.geolocation.getCurrentPosition 进行配置。内部缓冲区的大小必须至少为 1。

boundedCapacity: 0

此实现基于以下假设:内置的 /// <summary> /// Merges elements from all inner observable sequences into a single observable /// sequence,limiting the number of concurrent subscriptions to inner sequences. /// The unsubscribed inner sequences are stored in a buffer with the specified /// maximum capacity. When the buffer is full,the oldest inner sequence in the /// buffer is dropped and ignored in order to make room for the latest inner /// sequence. /// </summary> public static IObservable<T> MergeBounded<T>( this IObservable<IObservable<T>> source,int boundedCapacity) { if (boundedCapacity < 1) throw new ArgumentOutOfRangeException(nameof(boundedCapacity)); return Observable.Defer(() => { var queue = new Queue<IObservable<T>>(boundedCapacity); return source .Select(inner => { bool oldestDropped = false; lock (queue) { if (queue.Count == boundedCapacity) { queue.Dequeue(); oldestDropped = true; } queue.Enqueue(inner); } if (oldestDropped) return null; return Observable.Defer(() => { lock (queue) return queue.Dequeue(); }); }) .Where(inner => inner != null) .Merge(maximumConcurrency); }); } 运算符从不订阅同一子序列两次。否则,语句 Merge 可能会在空 queue.Dequeue() 上被调用,并导致异常。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?