微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何批量处理 IAsyncEnumerable<T>,在连续批次之间强制执行最大间隔策略?

如何解决如何批量处理 IAsyncEnumerable<T>,在连续批次之间强制执行最大间隔策略?

我有一个异步消息序列(流),这些消息有时大量到达,有时零星到达,我想以每批 10 条消息为一组处理它们。我还想对接收消息和处理消息之间的延迟强制实施上限,因此如果在收到批次的第一条消息后 5 秒过去了,也应该处理少于 10 条消息的批次。我发现我可以通过使用 Buffer 包中的 System.Interactive.Async 运算符来解决问题的第一部分:

IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
    // Process batch
}

Buffer 运算符的签名:

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source,int count);

不幸的是,Buffer 运算符没有带 TimeSpan 参数的重载,所以我不能那么容易地解决问题的第二部分。我必须自己以某种方式实现一个带有计时器的批处理运算符。我的问题是:如何实现具有以下签名的 Buffer 运算符的变体?

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source,TimeSpan timeSpan,int count);

关于计时器的行为,不需要像我之前描述的那样精确。如果结果序列最多每 5 秒发出一个缓冲区,即使在收到批处理的第一条消息后过去了不到 5 秒,我也可以。发出空缓冲区也可以。我更喜欢行为不完美的简单实现,而不是过于复杂的完美实现。

如果需要,我也可以向我的项目添加外部依赖项,例如 System.Interactive.AsyncSystem.Linq.Async 包。

附言这个问题的灵感来自与通道和内存泄漏相关的 a recent question


澄清:关于我更喜欢​​简单性而不是准确性,我的意思是,如果缓冲区的发出时间比最大延迟策略所要求的时间早,那也没关系。但是,如果违反了这项政策,那就不好了。在任何情况下,消息处理的延迟都不应超过 5 秒。也不允许比每 5 秒更频繁地发出缓冲区,除非缓冲区已满。

解决方法

更新:更改了解决方案以尝试适应 OP 的说明和要求。这种方法利用了很棒的 Nito.AsyncEx 包,它没有被 OP 明确命名为允许的。

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(this IAsyncEnumerable<TSource> source,TimeSpan timeSpan,int count)
{
    if (source == null)
        throw Error.ArgumentNull(nameof(source));
    if (count <= 0)
        throw Error.ArgumentOutOfRange(nameof(count));

    return Core(source,timeSpan,count);

    static async IAsyncEnumerable<IList<TSource>> Core(IAsyncEnumerable<TSource> source,TimeSpan timespan,int count,[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var waitable = new AsyncAutoResetEvent(false);

        var timer = new System.Timers.Timer(timespan.TotalMilliseconds);
        timer.Elapsed += (_,_) => waitable.Set();
        timer.AutoReset = true;
        timer.Start();

        var buffer = new ConcurrentBag<TSource>();
        
        var isCompleted = false;
        var enumerationTask = Task.Run(async () =>
        {
            await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
            {
                buffer.Add(item);

                if (buffer.Count == count)
                {
                    waitable.Set();
                }
            }

            isCompleted = true;
        },cancellationToken);

        while (!isCompleted)
        {
            await Task.WhenAny(enumerationTask,waitable.WaitAsync(cancellationToken));

            if (cancellationToken.IsCancellationRequested)
            {
                break;
            }

            var bufferedItems = new List<TSource>();
            while (buffer.TryTake(out var item))
            {
                bufferedItems.Add(item);
            }
            
            yield return bufferedItems;
        }

        if (buffer.Any())
        {
            yield return buffer.ToList();
        }

        timer.Dispose();
    }
}

---原帖---

这个怎么样?我对其进行了一些简单的测试,它似乎有效,但您需要运行自己的测试才能确定。改编自原版Buffer

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(this IAsyncEnumerable<TSource> source,[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var buffer = new List<TSource>(count);

        var lastSentTime = DateTime.Now;
        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            buffer.Add(item);

            var timeSinceLastSent = DateTime.Now - lastSentTime;
            
            if (buffer.Count == count || timeSinceLastSent >= timespan)
            {
                yield return buffer;

                lastSentTime = DateTime.Now;
                buffer = new List<TSource>(count);
            }
        }

        if (buffer.Count > 0)
        {
            yield return buffer;
        }
    }
}

编辑:再看一遍,这实际上并不能解决您在收益回报之间有最大时间的问题,所以可能不是您想要的。此解决方案取决于检索下一个元素的时间。无论如何,我都会把这个答案留在这里,以防它对某人有帮助。我试图让事情变得非常简单。

,

这里有两种方法可以解决这个问题。第一个有缺陷,但由于其极其简单,我还是将其发布了。 Buffer 包中已存在带有 TimeSpan 参数的 System.Reactive 运算符,并且 System.Linq.Async 包中存在异步和可观察序列之间的转换器。所以这只是将三个已经可用的运算符链接在一起的问题:

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source,int count)
{
    return source.ToObservable().Buffer(timeSpan,count).ToAsyncEnumerable();
}

不幸的是,这种简洁的方法是有缺陷的,因为从拉到推再回到拉模型的副作用。发生的情况是,中间可观察序列在订阅时开始积极地拉取源 IAsyncEnumerable,而不管结果 IAsyncEnumerable 是如何拉取的。因此,结果序列的使用者不是枚举的驱动程序,而是以源序列允许的最大速度在后台静默地进行枚举,并且生成的消息缓冲在内部队列中。因此,不仅有可能对消息的处理施加隐藏的延迟,而且内存消耗也有可能失控。

第二种是动手操作,使用 Task.Delay 方法作为定时器,使用 Task.WhenAny 方法来协调定时器和枚举任务。这种方法的行为类似于基于 Rx 的方法,除了源序列的枚举由结果序列的使用者驱动,正如人们所期望的那样。

/// <summary>
/// Projects each element of an async-enumerable sequence into a buffer that's
/// sent out when either it's full or a given amount of time has elapsed.
/// </summary>
public static async IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source,[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
    var moveTask = enumerator.MoveNextAsync().AsTask();
    var timerCts = new CancellationTokenSource();
    var delayTask = Task.Delay(timeSpan,timerCts.Token);
    var buffer = new List<TSource>(count);
    while (true)
    {
        var completedTask = await Task.WhenAny(moveTask,delayTask);
        if (completedTask == moveTask)
        {
            if (!await moveTask) break;
            buffer.Add(enumerator.Current);
            if (buffer.Count == count)
            {
                timerCts.Cancel(); timerCts.Dispose();
                yield return buffer.ToArray();
                buffer.Clear();
                timerCts = new CancellationTokenSource();
                delayTask = Task.Delay(timeSpan,timerCts.Token);
            }
            moveTask = enumerator.MoveNextAsync().AsTask();
        }
        else // completedTask == delayTask
        {
            yield return buffer.ToArray();
            buffer.Clear();
            delayTask = Task.Delay(timeSpan,timerCts.Token);
        }
    }
    timerCts.Cancel(); timerCts.Dispose();
    if (buffer.Count > 0)
    {
        yield return buffer.ToArray();
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?