如何解决如何为foreach创建并行预取
| 给出了在C#,TPL,并行扩展,异步CTP,响应式扩展中执行异步操作的众多新方法。我想知道并行化以下内容的提取和处理的最简单方法是:foreach(string url in urls)
{
var file = FetchFile(url);
ProcessFile(file);
}
附带条件是,虽然可以随时提取文件,但是“ 1”一次只能处理一个文件,应该依次调用。
简而言之,使FetchFile
和ProcessFile
以流水线方式表现(即同时发生)的最简单方法是什么?
解决方法
这是RX方式。此扩展会将uri的蒸汽转换为流的流:
public static IObservable<Stream> RequestToStream(this IObservable<string> source,TimeSpan timeout)
{
return
from wc in source.Select(WebRequest.Create)
from s in Observable
.FromAsyncPattern<WebResponse>(wc.BeginGetResponse,wc.EndGetResponse)()
.Timeout(timeout,Observable.Empty<WebResponse>())
.Catch(Observable.Empty<WebResponse>())
select s.GetResponseStream();
}
用法:
new [] { \"myuri.net\\file1.dat\",\"myuri.net\\file2.dat\" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Do(stream = > ProcessStream(stream))
.Subscribe();
编辑:哎呀,还没有注意到文件写入序列化的要求。这部分可以通过使用.Concat来完成,它本质上是一个RX队列(另一个是.Zip)。
让我们有一个.StreamToFile扩展名:
public static IObservable<Unit> StreamToFile(this Tuple<Stream,string> source)
{
return Observable.Defer(() =>
source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
}
现在您可以并行处理Web请求,但可以序列化来自它们的文件:
new[] { \"myuri.net\\file1.dat\",\"myuri.net\\file2.dat\" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Select((stream,i) => Tuple.Create(stream,i.ToString() + \".dat\"))
.Select(x => x.StreamToFile())
.Concat()
.Subscribe();
, 给定“ 1”的约束,我想说您应该使用TPL异步获取数据,然后排队引用预加载数据的令牌。然后,您可以拥有一个后台线程,该线程将项目从队列中拉出并将它们逐个交给ProcessFile。这是生产者/消费者模式。
对于队列,您可以看一下BlockingCollection,它可以提供一个线程安全的队列,并且还具有能够减轻工作量的良好效果。
, 由于我不了解所有花哨的机制,因此我可能会以旧的方式进行操作,尽管我怀疑它会被归类为“简单”:
var q = new Queue<MyFile>();
var ev = new ManualResetEvent(false);
new System.Threading.Thread(() =>
{
while ( true )
{
ev.WaitOne();
MyFile item;
lock (q)
{
item = q.Dequeue();
if ( q.Count == 0 )
ev.Reset();
}
if ( item == null )
break;
ProcessFile(item);
}
}).Start();
foreach(string url in urls)
{
var file = FetchFile(url);
lock (q)
{
q.Enqueue(file);
ev.Set();
}
}
lock (q)
{
q.Enqueue(null);
ev.Set();
}
, 异步实际上并不表示并行。这仅表示您不会阻止等待其他操作。但是您可以利用异步I / O来在下载URL时不阻塞线程,即,如果您这样做,则不需要与url一样多的线程来并行下载它们:
var client = new WebClient();
var syncLock = new object();
TaskEx.WhenAll(urls.Select(url => {
client.DownloadDataTaskAsync(url).ContinueWith((t) => {
lock(syncLock) {
ProcessFile(t.Result);
}
});
}));
基本上,我们为每个URL创建一个异步下载任务,然后在完成任何任务后,调用一个继续操作,该继续操作使用一个普通对象作为同步锁来确保ProcessFile
顺序发生。直到最后一个ProcessFile
延续完成,12ѭ才会返回。
您可以使用RX的ѭ14the避免显式锁定(但当然它将在内部锁定):
var pipeline = new ReplaySubject<byte[]>();
var files = pipeline.ToEnumerable();
var client = new WebClient();
TaskEx.WhenAll(urls
.Select(download => client.DownloadDataTaskAsync((string) download)
.ContinueWith(t => pipeline.OnNext(t.Result))
)
).ContinueWith(task => pipeline.OnCompleted(task));
foreach(var file in files) {
ProcessFile(file);
}
在这里,我们使用ѭ14作为文件下载渠道。每次下载均异步完成,并将结果发布到the17ѭ阻止的管道中(即顺序发生)。当所有任务完成时,我们完成可观察的事物,它退出了foreach
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。