微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

c# – 如何使用HttpWebRequest获取文件并行

我正在尝试像ID​​M这样的程序,可以同时下载文件的部分.
我正在使用的工具来实现这一点,是C#.Net4.5中的TPL
但是当使用任务使操作平行时,我有一个问题.
顺序功能运行正常,正确下载文件.
使用Tasks的并行功能正在运行,直到发生奇怪的事情:
我创建了4个任务,在Factory.StartNew()中,在每个任务中,给出了起始位置和结束位置,任务将下载这些文件,然后返回byte [],一切顺利,任务工作正常,但在某些时候,执行冻结就是这样,程序停止,没有其他事情发生.
并行功能的实现:
static void DownloadPartsParallel()
    {

        string uriPath = "http://mschnlnine.vo.llnwd.net/d1/pdc08/PPTX/BB01.pptx";
        Uri uri = new Uri(uriPath);
        long l = GetFileSize(uri);
        Console.WriteLine("Size={0}",l);
        int granularity = 4;
        byte[][] arr = new byte[granularity][];
        Task<byte[]>[] tasks = new Task<byte[]>[granularity];
        tasks[0] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri,l / granularity));
        tasks[1] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri,l / granularity + 1,l / granularity + l / granularity));
        tasks[2] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri,l / granularity + l / granularity + 1,l / granularity + l / granularity + l / granularity));
        tasks[3] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri,l / granularity + l / granularity + l / granularity + 1,l));//(l / granularity) + (l / granularity) + (l / granularity) + (l / granularity)


        arr[0] = tasks[0].Result;
        arr[1] = tasks[1].Result;
        arr[2] = tasks[2].Result;
        arr[3] = tasks[3].Result;
        Stream localStream;
        localStream = File.Create("E:\\a\\" + Path.GetFileName(uri.LocalPath));
        for (int i = 0; i < granularity; i++)
        {

            if (i == granularity - 1)
            {
                for (int j = 0; j < arr[i].Length - 1; j++)
                {
                    localStream.WriteByte(arr[i][j]);
                }
            }
            else
                for (int j = 0; j < arr[i].Length; j++)
                {
                    localStream.WriteByte(arr[i][j]);
                }
        }
    }

DownloadPartOfFile函数实现:

public static byte[] DownloadPartOfFile(Uri fileUrl,long from,long to)
    {
        int bytesProcessed = 0;
        BinaryReader reader = null;
        WebResponse response = null;
        byte[] bytes = new byte[(to - from) + 1];

        try
        {
            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(fileUrl);
            request.AddRange(from,to);
            request.ReadWriteTimeout = int.MaxValue;
            request.Timeout = int.MaxValue;
            if (request != null)
            {
                response = request.GetResponse();
                if (response != null)
                {
                    reader = new BinaryReader(response.GetResponseStream());
                    int bytesRead;
                    do
                    {
                        byte[] buffer = new byte[1024];
                        bytesRead = reader.Read(buffer,buffer.Length);
                        if (bytesRead == 0)
                        {
                            break;
                        }
                        Array.Resize<byte>(ref buffer,bytesRead);
                        buffer.copyTo(bytes,bytesProcessed);
                        bytesProcessed += bytesRead;
                        Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ",Downloading" + bytesProcessed);
                    } while (bytesRead > 0);
                }
            }
        }
        catch (Exception e)
        {
            Console.WriteLine(e.Message);
        }
        finally
        {
            if (response != null) response.Close();
            if (reader != null) reader.Close();
        }

        return bytes;
    }

我试图通过将int.MaxValue设置为读取超时,写入超时和超时来解决它,这就是为什么程序冻结,如果我没有这样做,超时的例外会在功能DownloadPartsParallel
所以有一个解决方案,或任何其他可能有帮助的建议,谢谢.

解决方法

我会使用 HttpClient.SendAsync而不是WebRequest(见 “HttpClient is Here!”).

我不会使用任何额外的线程. HttpClient.SendAsync API自然是异步的,并且返回等待的任务,不需要将其卸载到具有Task.Run/Task.TaskFactory.StartNew的池线程(参见this进行详细的讨论).

我也将使用SemaphoreSlim.WaitAsync()限制并行下载的数量.以下是我作为控制台应用程序(未经广泛测试):

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21737681
{
    class Program
    {
        const int MAX_ParaLLEL = 4; // max parallel downloads
        const int CHUNK_SIZE = 2048; // size of a single chunk

        // a chunk of downloaded data
        class Chunk
        {
            public long Start { get; set; }
            public int Length { get; set; }
            public byte[] Data { get; set; }
        };

        // throttle downloads
        SemaphoreSlim _throttleSemaphore = new SemaphoreSlim(MAX_ParaLLEL);

        // get a chunk
        async Task<Chunk> GetChunk(HttpClient client,long start,int length,string url)
        {
            await _throttleSemaphore.WaitAsync();
            try
            {
                using (var request = new HttpRequestMessage(HttpMethod.Get,url))
                {
                    request.Headers.Range = new System.Net.Http.Headers.RangeHeaderValue(start,start + length - 1);
                    using (var response = await client.SendAsync(request))
                    {
                        var data = await response.Content.ReadAsByteArrayAsync();
                        return new Chunk { Start = start,Length = length/*,Data = data*/ };
                    }
                }
            }
            finally
            {
                _throttleSemaphore.Release();
            }
        }

        // download the URL in parallel by chunks
        async Task<Chunk[]> DownloadAsync(string url)
        {
            using (var client = new HttpClient())
            {
                var request = new HttpRequestMessage(HttpMethod.Head,url);
                var response = await client.SendAsync(request);
                var contentLength = response.Content.Headers.ContentLength;

                if (!contentLength.HasValue)
                    throw new InvalidOperationException("ContentLength");

                var numOfChunks = (int)((contentLength.Value + CHUNK_SIZE - 1) / CHUNK_SIZE);

                var tasks = Enumerable.Range(0,numOfChunks).Select(i =>
                {
                    // start a new chunk
                    long start = i * CHUNK_SIZE;
                    var length = (int)Math.Min(CHUNK_SIZE,contentLength.Value - start);
                    return GetChunk(client,start,length,url);
                }).ToList();

                await Task.WhenAll(tasks);

                // the order of chunks is random
                return tasks.Select(task => task.Result).ToArray();
            }
        }

        static void Main(string[] args)
        {
            var program = new Program();
            var chunks = program.DownloadAsync("http://flaglane.com/download/australian-flag/australian-flag-large.png").Result;

            Console.WriteLine("Chunks: " + chunks.Count());
            Console.ReadLine();
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


原文地址:http://msdn.microsoft.com/en-us/magazine/cc163791.aspx 原文发布日期: 9/19/2005 原文已经被 Microsoft 删除了,收集过程中发现很多文章图都不全,那是因为原文的图都不全,所以特收集完整全文。 目录 前言 CLR启动程序
前言 随着近些年微服务的流行,有越来越多的开发者和团队所采纳和使用,它的确提供了很多的优势也解决了很多的问题,但是我们也知道也并不是银弹,提供优势的同时它也给我们的开发人员和团队也带来了很多的挑战。 为了迎接或者采用这些新技术,开发团队需要更加注重一些流程或工具的使用,这样才能更好的适应这些新技术所
最近因为比较忙,好久没有写博客了,这篇主要给大家分享一下PLINQ中的分区。上一篇介绍了并行编程,这边详细介绍一下并行编程中的分区和自定义分区。 先做个假设,假设我们有一个200Mb的文本文件需要读取,怎么样才能做到最优的速度呢?对,很显然就是拆分,把文本文件拆分成很多个小文件,充分利用我们计算机中
在多核CPU在今天和不久的将来,计算机将拥有更多的内核,Microsoft为了利用这个硬件特性,于是在Visual Studio 2010 和 .NET Framework 4的发布及以上版本中,添加了并行编程这个新特性,我想它以后势必会改变我们的开发方式。 在以前或者说现在,我们在并行开发的时候可
c语言输入成绩怎么判断等级
字符型数据在内存中的存储形式是什么
c语言怎么求字符串的长度并输出
c语言函数的三种调用方式是什么
c语言中保留两位小数怎么表示
double的输入格式符是什么