微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用多核和多线程来获得最大的cpu和I / O容量以加速c ++中的Program?

如何解决如何使用多核和多线程来获得最大的cpu和I / O容量以加速c ++中的Program?

我写了一个运行python multiprocess和multithread的c ++程序。 我们的输入包含多个文件夹,其中包含超过20000个文件, 我们从磁盘读取文件,然后对其进行处理,最后将其写入磁盘。

Python的工作是在线程之间分配这些文件

Ps。我用C ++读取文件

我想尝试c / c ++,多核和多线程来加速我的程序。
有一些软件包,例如openmp和ppl,pthreads ...,但是我不能使用它们的功能,看来open mp使用的是cpu的最大容量(100%),但是它并没有提高处理速度....我测试了300个文件和结果是:-没有多核和多线程,需要15秒(20%的cpu使用率)-需要30秒(100%的cpu使用率)

我还检查了磁盘使用情况,它表明有3-4%的磁盘正在工作,对于这种工作来说这是非常低的。
我找不到原因和问题所在。

我非常感谢您的帮助,这是我的代码

template<typename Vector>
auto split_vector(const Vector& v,unsigned number_lines) {
    using Iterator = typename Vector::const_iterator;
    vector<Vector> rtn;
    Iterator it = v.cbegin();
    const Iterator end = v.cend();

    while (it != end) {
        Vector v;
        back_insert_iterator<Vector> inserter(v);
        const auto num_to_copy = min(static_cast<unsigned>(distance(it,end)),number_lines);
        copy(it,it + num_to_copy,inserter);
        rtn.push_back(move(v));
        advance(it,num_to_copy);
    }

    return rtn;
}



bool ListFiles(wstring inPath,wstring outPath,vector<pair<wstring,wstring>>& inputFiles,wstring mask) {
    HANDLE hFind = INVALID_HANDLE_VALUE;
    WIN32_FIND_DATA ffd;
    wstring spec;
    wstring spec2;
    stack<wstring> directories;
    stack<wstring> outDirectories;

    directories.push(inPath);
    inputFiles.clear();
    outDirectories.push(outPath);

    while (!directories.empty()) {
        inPath = directories.top();
        spec = inPath + L"\\" + mask;
        directories.pop();

        outPath = outDirectories.top();
        spec2 = outPath + L"\\" + mask;
        outDirectories.pop();

        hFind = FindFirstFile(spec.c_str(),&ffd);
        if (hFind == INVALID_HANDLE_VALUE) {
            return false;
        }

        do {
            if (wcscmp(ffd.cFileName,L".") != 0 &&
                wcscmp(ffd.cFileName,L"..") != 0) {
                if (ffd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
                    directories.push(inPath + L"\\" + ffd.cFileName);
                    outDirectories.push(outPath + L"\\" + ffd.cFileName);
                }
                else {
                    inputFiles.push_back(make_pair(inPath + L"\\" + ffd.cFileName,outPath + L"\\" + L"m_" + ffd.cFileName));
                }
            }
        } while (FindNextFile(hFind,&ffd) != 0);

        if (GetLastError() != ERROR_NO_MORE_FILES) {
            FindClose(hFind);
            return false;
        }

        FindClose(hFind);
        hFind = INVALID_HANDLE_VALUE;
    }
    return true;
}

void Thread(vector<pair<wstring,wstring>>& v)
{
    vector<thread> tv;
    for (int i = 0; i < v.size(); i++) {
        tv.emplace_back(work,v[i].first /*input file*/,v[i].second  /*output file*/);
    }

    for (auto& t : tv) {
        t.join();
    }
}


void Async(vector<pair<wstring,wstring>>v)
{
    for (int i = 0; i < v.size(); i++) {
        future<void> result(async(launch::async,work,v[i].second  /*output file*/));
    }
}


void Process(vector<pair<wstring,wstring>>& v,const int threadCount)
{
    int i,n = v.size();
#pragma omp parallel
    {
#pragma omp  for default(none) private(i) shared(v) schedule(dynamic,threadCount)
        for (i = 0; i < n; i++)
        {
            try
            {
                work(v[i].first /*input file*/,v[i].second  /*output file*/);
            }
            catch (const exception& exc)
            {
                printf("%s\n",exc.what());
            }
        }

    };
}


void Process2(vector<pair<wstring,const int threadCount)
{
    auto vectors = split_vector(v,threadCount);
    int i,n = vectors.size();

#pragma omp parallel
    {
#pragma omp  for  Nowait
        for (i = 0; i < n; i++)
        {
            try
            {
                Async(vectors[i]);
            }
            catch (const exception& exc)
            {
                printf("%s\n",exc.what());
            }
        }

    };
}

void Process3(vector<pair<wstring,n = vectors.size();

#pragma omp parallel
    {
#pragma omp  for  Nowait
        for (i = 0; i < n; i++)
        {
            try
            {
                Thread(vectors[i]);
            }
            catch (const exception& exc)
            {
                printf("%s\n",exc.what());
            }
        }

    };
}


int wmain(int argc,wchar_t** argv) {

    if (argc != 7) return EXIT_FAILURE;

    auto start = high_resolution_clock::Now();

    const wstring input_dir(argv[1]);
    if (input_dir.empty()) return EXIT_FAILURE;


    const wstring output_dir(argv[2]);
    if (output_dir.empty())  return EXIT_FAILURE;


    int p = thread::hardware_concurrency();

    int processor_count = _wtoi(argv[3]) > p ? p : _wtoi(argv[3]);
    int thread_count = _wtoi(argv[4]);
    int level = _wtoi(argv[5]);

    vector<pair<wstring,wstring>> inFiles;
    ListFiles(input_dir,output_dir,inFiles,L"*");
    omp_set_num_threads(processor_count);
    Process(inFiles,thread_count);
    //Process2(inFiles,thread_count);
    //Process3(inFiles,thread_count);


    auto stop = high_resolution_clock::Now();
    auto duration = duration_cast<seconds>(stop - start);
    printf("Duration: %d",duration);
    return 0;
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。