如何解决在C#8 IAsyncEnumerable <T>中并行化收益率返回
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var item in listofWorkItems)
{
yield return DoWork(item);
}
}
呼叫者:
public async Task LogResultsAsync()
{
await foreach (var result in DoWorkAsync())
{
Console.WriteLine(result);
}
}
由于DoWork
是一项昂贵的操作,我希望以某种方式对其进行并行化,因此它的工作方式类似于:
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
Parallel.ForEach(listofWorkItems,item =>
{
yield return DoWork(item);
});
}
但是我无法从Parallel.Foreach
内部获得收益收益,所以想知道最好的方法是什么?
返回结果的顺序无关紧要。
谢谢。
编辑:抱歉,我在DoWorkAsync
中遗漏了一些代码,确实是在等待我没有将其放在上面代码中的某些内容,因为这与问题无关。现在更新
Edit2 :DoWork
在我的情况下主要是I / O绑定,它是从数据库中读取数据。
解决方法
根据canton7的建议,您可以使用AsParallel
代替Parallel.ForEach
。
这可以在标准foreach
循环中使用,您可以在其中产生结果:
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
{
yield return result;
}
}
正如西奥多·祖利亚斯(Theodor Zoulias)所述,返回的可枚举实际上根本不是异步的。
如果您只需要使用await foreach
来使用它就不成问题,但是更明确地说,您可以返回IEnumerable
并让调用者对其进行并行化:
public async Task<IEnumerable<Item>> DoWorkAsync()
{
await Something();
return ListOfWorkItems;
}
// Caller...
Parallel.ForEach(await DoWorkAsync(),item =>
{
var result = DoWork(item);
//...
});
尽管如果需要在多个位置调用它,则可能难以维护
,这是一个使用TransformBlock
库中的TPL Dataflow的基本实现:
public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
// Define the dataflow block
var block = new TransformBlock<IWorkItem,IResult>(async item =>
{
return await TransformAsync(item);
},new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10,// the default is 1
EnsureOrdered = false // the default is true
});
// Feed the block with input data
foreach (var item in workItems)
{
block.Post(item);
}
block.Complete();
// Stream the block's output as IAsyncEnumerable
while (await block.OutputAvailableAsync())
{
while (block.TryReceive(out var result))
{
yield return result;
}
}
// Propagate possible exceptions
await block.Completion;
}
此实现不是完美的,因为如果IAsyncEnumerable
的使用者过早放弃枚举,则TransformBlock
将继续在后台工作,直到处理完所有工作项为止。此外,它也不支持取消,所有受人尊敬的IAsyncEnumerable
生成方法都应支持取消。这些缺少的功能可以相对容易地添加。如果您有兴趣添加它们,请查看this问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。