如何解决如何合并具有有限并发和有限缓冲区容量的嵌套可观察 IObservable<IObservable<T>>?
我注意到 Rx Merge
运算符接受一个可选的 maxConcurrent
参数。这可用于通过同时订阅有限数量的子序列来限制最大并发。当新子序列的推送速度低于订阅子序列的完成速度时,它可以完美地工作,但是当新子序列的推送速度比这个速度快时,它就会出现问题。发生的情况是子序列被缓冲在一个内部缓冲区中,其大小永远增加,而且当前订阅的子序列变得越来越老。下面是这个问题的演示:
await Observable
.Generate(0,_ => true,x => x,_ => TimeSpan.FromMilliseconds(10))
.Select(_ => Observable
.Return(DateTime.Now)
.Do(d => Console.WriteLine(
$"Then: {d:HH:mm:ss.fff}," +
$"Now: {DateTime.Now:HH:mm:ss.fff}," +
$"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
.Delay(TimeSpan.FromMilliseconds(1000)))
.Merge(maxConcurrent: 1)
.Take(10);
每 10 毫秒推送一个新子序列,每个子序列在 1000 毫秒后完成。子序列以最大并发1(顺序)合并。
输出:
Then: 12:45:34.019,Now: 12:45:34.054,TotalMemory: 117,040 bytes
Then: 12:45:34.082,Now: 12:45:35.088,TotalMemory: 139,336 bytes
Then: 12:45:34.093,Now: 12:45:36.094,TotalMemory: 146,336 bytes
Then: 12:45:34.114,Now: 12:45:37.098,TotalMemory: 153,216 bytes
Then: 12:45:34.124,Now: 12:45:38.109,TotalMemory: 159,272 bytes
Then: 12:45:34.145,Now: 12:45:39.126,TotalMemory: 167,608 bytes
Then: 12:45:34.156,Now: 12:45:40.141,TotalMemory: 173,952 bytes
Then: 12:45:34.177,Now: 12:45:41.147,TotalMemory: 180,432 bytes
Then: 12:45:34.188,Now: 12:45:42.164,TotalMemory: 186,808 bytes
Then: 12:45:34.209,Now: 12:45:43.175,TotalMemory: 197,208 bytes
内存使用量稳步增长,每个子序列的创建和订阅之间的时间差距也越来越大。
我想要的是一个自定义的 Merge
变体,它有一个大小有限的内部缓冲区。当缓冲区已满时,任何传入子序列都应导致当前最旧的缓冲子序列被丢弃。这是理想行为的弹珠图,配置为最大并发数 = 1 和缓冲区容量 = 1:
Source: +----A------B------C------|
A: +-------a----a---|
B: not-subscribed
C: +-----c----|
Result: +------------a----a---------c----|
- 子序列 A 在发出后立即被订阅。
- 然后 B 被发射并存储在缓冲区中,因为 A 还没有完成。
- 然后发出 C 并替换缓冲区中的 B。结果 B 子序列被丢弃并且从未被订阅。
- 在子序列 A 完成之后,立即订阅了缓冲的子序列 C。
- 最终结果包含 A 和 C 子序列发出的合并值。
如何实现具有这种特定行为的自定义 Rx 运算符?这是我试图实现的运算符的存根:
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,int maximumConcurrency,int boundedCapacity)
{
return source.Merge(maximumConcurrency);
// Todo: enforce the boundedCapacity policy somehow
}
解决方法
我想出了一个功能性的解决方案,我不确定它是否可行,只是因为复杂。但我想我涵盖了所有的基础。
首先,如果采用函数式方法,这是一个相对简单的状态机问题:状态需要知道当前正在执行多少个 observable 以及缓冲区队列。可以影响状态的两个事件是一个新的 Observable 进入缓冲区队列(导致缓冲区队列入队),或者当前正在执行的 observable 终止(导致缓冲区队列出队)。
由于状态机基本上意味着 Scan
,而 Scan
只能处理一种类型,因此我们必须将我们的两个事件强制转换为一种类型,我在下面将其称为 Message
.然后状态机知道所有并可以完成 Merge(n)
重载的工作。
最后一个技巧是回送:由于完成的 Observable 是 Scan
的“下游”,我们需要将该 observable 的终止“回送”到 Scan
。为此,我总是参考 [this answer][1] 中的 Drain
函数。
public static class X
{
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,int maximumConcurrency,int boundedCapacity)
{
return Observable.Defer(() =>
{
var capacityQueue = new Subject<Unit>();
var toReturn = source.Publish(_source => _source
.Select(o => Message.Enqueue(o))
.Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
.Scan((bufferCount: 0,buffer: ImmutableQueue<IObservable<T>>.Empty,executionCount: 0,item: (IObservable<T>)null),(state,message) =>
{
var buffer = state.buffer;
var bufferCount = state.bufferCount;
var executionCount = state.executionCount;
if (message.IsEnqueue)
{
if (executionCount < maximumConcurrency)
return (0,ImmutableQueue<IObservable<T>>.Empty,executionCount + 1,message.Object);
buffer = buffer.Enqueue(message.Object);
if (bufferCount == boundedCapacity)
buffer = buffer.Dequeue();
else
bufferCount++;
return (bufferCount,buffer,executionCount,null);
}
else
{
if (bufferCount == 0)
return (0,executionCount - 1,null);
else
return (bufferCount - 1,buffer.Dequeue(),buffer.Peek());
}
})
.Where(t => t.item != null)
.Select(t => t.item)
.Select(o => o.Do(_ => { },() => capacityQueue.OnNext(Unit.Default)))
.TakeUntil(_source.IgnoreElements().Materialize())
.Merge()
);
return toReturn;
});
}
public class Message
{
public static Message<T> Enqueue<T>(T t)
{
return Message<T>.Enqueue(t);
}
public static Message<T> Dequeue<T>(T t)
{
return Message<T>.Dequeue(t);
}
}
public class Message<T>
{
private readonly T _t;
private readonly bool _isEnqueue;
private Message(bool isEnqueue,T t)
{
_t = t;
_isEnqueue = isEnqueue;
}
public static Message<T> Enqueue(T t)
{
return new Message<T>(true,t);
}
public static Message<T> Dequeue(T t)
{
return new Message<T>(false,t);
}
public bool IsEnqueue => _isEnqueue;
public T Object => _t;
}
}
我写了一些测试代码(基于原始问题)来验证,如果你想利用它。测试通过:
// T: 0123456789012345678901234567890123
// T10: 0 1 2 3
// Source: +----A------B------C------|
// A: +-------a----a---|
// B: +----------b----b---|
// C: +--------c----|
// ExpectedResult: +------------a----a---------c----|
var ts = new TestScheduler();
var A = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond,"a"),ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond,ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond,"b"),ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond,ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond,"c"),ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond,A.AsObservable()),ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond,B.AsObservable()),ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond,C.AsObservable()),ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1,1);
testResult.Subscribe(observer);
var expected = ts.CreateHotObservable(
ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond,ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual"); // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages,observer.Messages);
(测试代码无一例外通过)
,这是另一个实现。它不像 Shlomo 的 solution 那样功能完备,因为它不能用 navigator.geolocation.getCurrentPosition
进行配置。内部缓冲区的大小必须至少为 1。
boundedCapacity: 0
此实现基于以下假设:内置的 /// <summary>
/// Merges elements from all inner observable sequences into a single observable
/// sequence,limiting the number of concurrent subscriptions to inner sequences.
/// The unsubscribed inner sequences are stored in a buffer with the specified
/// maximum capacity. When the buffer is full,the oldest inner sequence in the
/// buffer is dropped and ignored in order to make room for the latest inner
/// sequence.
/// </summary>
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,int boundedCapacity)
{
if (boundedCapacity < 1)
throw new ArgumentOutOfRangeException(nameof(boundedCapacity));
return Observable.Defer(() =>
{
var queue = new Queue<IObservable<T>>(boundedCapacity);
return source
.Select(inner =>
{
bool oldestDropped = false;
lock (queue)
{
if (queue.Count == boundedCapacity)
{
queue.Dequeue(); oldestDropped = true;
}
queue.Enqueue(inner);
}
if (oldestDropped) return null;
return Observable.Defer(() =>
{
lock (queue) return queue.Dequeue();
});
})
.Where(inner => inner != null)
.Merge(maximumConcurrency);
});
}
运算符从不订阅同一子序列两次。否则,语句 Merge
可能会在空 queue.Dequeue()
上被调用,并导致异常。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。