如何解决如何批量处理 IAsyncEnumerable<T>,在连续批次之间强制执行最大间隔策略?
我有一个异步消息序列(流),这些消息有时大量到达,有时零星到达,我想以每批 10 条消息为一组处理它们。我还想对接收消息和处理消息之间的延迟强制实施上限,因此如果在收到批次的第一条消息后 5 秒过去了,也应该处理少于 10 条消息的批次。我发现我可以通过使用 Buffer
包中的 System.Interactive.Async 运算符来解决问题的第一部分:
IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
// Process batch
}
Buffer
运算符的签名:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source,int count);
不幸的是,Buffer
运算符没有带 TimeSpan
参数的重载,所以我不能那么容易地解决问题的第二部分。我必须自己以某种方式实现一个带有计时器的批处理运算符。我的问题是:如何实现具有以下签名的 Buffer
运算符的变体?
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source,TimeSpan timeSpan,int count);
关于计时器的行为,不需要像我之前描述的那样精确。如果结果序列最多每 5 秒发出一个缓冲区,即使在收到批处理的第一条消息后过去了不到 5 秒,我也可以。发出空缓冲区也可以。我更喜欢行为不完美的简单实现,而不是过于复杂的完美实现。
如果需要,我也可以向我的项目添加外部依赖项,例如 System.Interactive.Async 或 System.Linq.Async 包。
附言这个问题的灵感来自与通道和内存泄漏相关的 a recent question。
澄清:关于我更喜欢简单性而不是准确性,我的意思是,如果缓冲区的发出时间比最大延迟策略所要求的时间早,那也没关系。但是,如果违反了这项政策,那就不好了。在任何情况下,消息处理的延迟都不应超过 5 秒。也不允许比每 5 秒更频繁地发出缓冲区,除非缓冲区已满。
解决方法
更新:更改了解决方案以尝试适应 OP 的说明和要求。这种方法利用了很棒的 Nito.AsyncEx 包,它没有被 OP 明确命名为允许的。
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(this IAsyncEnumerable<TSource> source,TimeSpan timeSpan,int count)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (count <= 0)
throw Error.ArgumentOutOfRange(nameof(count));
return Core(source,timeSpan,count);
static async IAsyncEnumerable<IList<TSource>> Core(IAsyncEnumerable<TSource> source,TimeSpan timespan,int count,[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var waitable = new AsyncAutoResetEvent(false);
var timer = new System.Timers.Timer(timespan.TotalMilliseconds);
timer.Elapsed += (_,_) => waitable.Set();
timer.AutoReset = true;
timer.Start();
var buffer = new ConcurrentBag<TSource>();
var isCompleted = false;
var enumerationTask = Task.Run(async () =>
{
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
buffer.Add(item);
if (buffer.Count == count)
{
waitable.Set();
}
}
isCompleted = true;
},cancellationToken);
while (!isCompleted)
{
await Task.WhenAny(enumerationTask,waitable.WaitAsync(cancellationToken));
if (cancellationToken.IsCancellationRequested)
{
break;
}
var bufferedItems = new List<TSource>();
while (buffer.TryTake(out var item))
{
bufferedItems.Add(item);
}
yield return bufferedItems;
}
if (buffer.Any())
{
yield return buffer.ToList();
}
timer.Dispose();
}
}
---原帖---
这个怎么样?我对其进行了一些简单的测试,它似乎有效,但您需要运行自己的测试才能确定。改编自原版Buffer。
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(this IAsyncEnumerable<TSource> source,[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffer = new List<TSource>(count);
var lastSentTime = DateTime.Now;
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
buffer.Add(item);
var timeSinceLastSent = DateTime.Now - lastSentTime;
if (buffer.Count == count || timeSinceLastSent >= timespan)
{
yield return buffer;
lastSentTime = DateTime.Now;
buffer = new List<TSource>(count);
}
}
if (buffer.Count > 0)
{
yield return buffer;
}
}
}
编辑:再看一遍,这实际上并不能解决您在收益回报之间有最大时间的问题,所以可能不是您想要的。此解决方案取决于检索下一个元素的时间。无论如何,我都会把这个答案留在这里,以防它对某人有帮助。我试图让事情变得非常简单。
,这里有两种方法可以解决这个问题。第一个有缺陷,但由于其极其简单,我还是将其发布了。 Buffer
包中已存在带有 TimeSpan
参数的 System.Reactive 运算符,并且 System.Linq.Async 包中存在异步和可观察序列之间的转换器。所以这只是将三个已经可用的运算符链接在一起的问题:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source,int count)
{
return source.ToObservable().Buffer(timeSpan,count).ToAsyncEnumerable();
}
不幸的是,这种简洁的方法是有缺陷的,因为从拉到推再回到拉模型的副作用。发生的情况是,中间可观察序列在订阅时开始积极地拉取源 IAsyncEnumerable
,而不管结果 IAsyncEnumerable
是如何拉取的。因此,结果序列的使用者不是枚举的驱动程序,而是以源序列允许的最大速度在后台静默地进行枚举,并且生成的消息缓冲在内部队列中。因此,不仅有可能对消息的处理施加隐藏的延迟,而且内存消耗也有可能失控。
第二种是动手操作,使用 Task.Delay
方法作为定时器,使用 Task.WhenAny
方法来协调定时器和枚举任务。这种方法的行为类似于基于 Rx 的方法,除了源序列的枚举由结果序列的使用者驱动,正如人们所期望的那样。
/// <summary>
/// Projects each element of an async-enumerable sequence into a buffer that's
/// sent out when either it's full or a given amount of time has elapsed.
/// </summary>
public static async IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source,[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
var moveTask = enumerator.MoveNextAsync().AsTask();
var timerCts = new CancellationTokenSource();
var delayTask = Task.Delay(timeSpan,timerCts.Token);
var buffer = new List<TSource>(count);
while (true)
{
var completedTask = await Task.WhenAny(moveTask,delayTask);
if (completedTask == moveTask)
{
if (!await moveTask) break;
buffer.Add(enumerator.Current);
if (buffer.Count == count)
{
timerCts.Cancel(); timerCts.Dispose();
yield return buffer.ToArray();
buffer.Clear();
timerCts = new CancellationTokenSource();
delayTask = Task.Delay(timeSpan,timerCts.Token);
}
moveTask = enumerator.MoveNextAsync().AsTask();
}
else // completedTask == delayTask
{
yield return buffer.ToArray();
buffer.Clear();
delayTask = Task.Delay(timeSpan,timerCts.Token);
}
}
timerCts.Cancel(); timerCts.Dispose();
if (buffer.Count > 0)
{
yield return buffer.ToArray();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。