微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法在生产者-消费者问题中获得第一批的完美同步

如何解决无法在生产者-消费者问题中获得第一批的完美同步

std::vector<infer_request_t*> infer_req_vec;
std::vector<infer_request_t*> infer_req_queue;

std::mutex produce_mutex;
std::condition_variable pcv;
std::condition_variable icv;

void produce_blobs(){

    bool enable_inf_stats = true;
    while(!stop_test && (batchCount < appargs.queryCount || appargs.isInfinite))
    {   
        std::unique_lock<std::mutex> produce_lock(produce_mutex);
        pcv.wait_for(produce_lock,std::chrono::seconds(2),[]{return (((appargs.samplesPerQuery - infer_req_vec.size())>0));});
        if(stop_test || (batchCount >= appargs.queryCount && appargs.queryCount != -1)) break;
        // CODE FOR GENErating IPBLOB AND OPBLOB FOR QUERY
        infer_request_t *infReq = NULL;
        infer_req_vec.push_back(infReq);
        int infer_req_vec_size = infer_req_vec.size();
        status = API(session,hEndPt,model,ipblob,opblob,enable_inf_stats,&infer_req_vec[infer_req_vec_size-1]);
        icv.notify_one();
        produce_lock.unlock();
        
    }
}

void infer_wait(){

    while(!stop_test && (batchCount < appargs.queryCount || appargs.isInfinite ))
    {
        infer_request_t* current_infer_req_wait;
        {
            std::unique_lock<std::mutex> infer_lock(produce_mutex);
            icv.wait(infer_lock,[]{return (((infer_req_vec.size()>0) && (infer_req_vec[0] != NULL)));});
            if(stop_test || (batchCount >= appargs.queryCount && appargs.queryCount != -1)) break;
            infer_req_queue.push_back(infer_req_vec[0]);
            infer_req_vec.erase(infer_req_vec.begin()+0);
            pcv.notify_one();
            infer_lock.unlock();
        }
        int queue_size = infer_req_queue.size();
        for(int i=0; i<queue_size; i++)
        {
            current_infer_req_wait = infer_req_queue[0];
            infer_req_queue.erase(infer_req_queue.begin()+0);
            //FURTHER PROCESSING ON `current_infer_req_wait`
        }
    }
}

int main(int argc,char* argv[]){

    std::thread t1(produce_blobs);
    std::thread t2(infer_wait);

    t1.join();
    t2.join();
}

以上是我写的生产者-消费者问题。

API - 接受一批输入,批号取决于 appargs.samplesPerQuery

我面临的问题是,对于第一批,除非它推送所有 (appargs.samplesPerQuery) 输入和输出 blob,否则 infer_wait 线程不会被执行,即使我正在调用 { {1}} 立即。

这只发生在第一批,从下一次开始,它以完美的同步发生,即 icv.notify_one() 消耗来自 infer_wait() 向量的一个请求,{{ 1}} 再推一个,一直持续到执行结束。

我也想为第一批获得这种完美的同步。

如果我遗漏了什么或做错了什么,请指出。

谢谢

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。