如何解决多个生产者-消费者问题坚持最后一次消费
我正在尝试使用 pthreads 和信号量解决多生产者-消费者问题,但它总是坚持在最后一次消费和停止。 它将有 NO_ITEMS 项并假设缓冲区的大小为 BUFFER_SIZE
这是我当前的代码如下。
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100
using namespace std;
void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();
sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;
stack<int> items;
static int count = 0;
int main()
{
sem_init(&fillCount,0);
sem_init(&emptyCount,BUFFER_SIZE);
pthread_mutex_init(&mutex,nullptr);
pthread_t p1,c1,c2,c3;
pthread_create(&p1,nullptr,thread_producer,nullptr);
pthread_create(&c1,thread_consumer,nullptr);
pthread_create(&c2,nullptr);
pthread_create(&c3,nullptr);
pthread_join(p1,nullptr);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(c3,nullptr);
sem_destroy(&fillCount);
sem_destroy(&emptyCount);
pthread_mutex_destroy(&mutex);
return 0;
}
void* thread_consumer(void* args) {
while (count < NO_ITEMS) {
sem_wait(&fillCount);
pthread_mutex_lock(&mutex);
if (!items.empty() && count < NO_ITEMS - 1) {
removeItem();
}
count++;
pthread_mutex_unlock(&mutex);
sem_post(&emptyCount);
}
return nullptr;
}
void* thread_producer(void* args) {
for (int i = 0; i < NO_ITEMS; i++) {
sem_wait(&emptyCount);
pthread_mutex_lock(&mutex);
addItem(i);
// sleep(1);
pthread_mutex_unlock(&mutex);
sem_post(&fillCount);
}
return nullptr;
}
void addItem(int i) {
cout << "Produced: " << i << endl;
items.push(i);
}
void removeItem() {
cout << "Consumed: " << items.top() << endl;
items.pop();
}
这是输出的一部分:
Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt
解决方法
有缺陷的逻辑
您的代码有逻辑问题。假设 NO_ITEMS
是 100,到目前为止已经消耗了 99。让两个消费者线程在那个点到达 while
循环的顶部,并假设两者都将 count
读为 99(但见下文),因此进入循环体。两个消费者都会在 sem_wait()
上阻塞,但最多还有一个要生产的项目,因此生产者最多只会再增加一次信号量,至少让其中一个消费者无限期地阻塞。
未定义的行为
此外,您的 thread_consumer()
函数包含数据竞争,导致您的程序行为未定义。具体而言,count
条件中共享变量 while
的读取未正确同步。虽然不能可靠地预测 UB 将如何表现(否则它不会是“未定义的”),但非同步访问显示一个线程的明显失败以查看其他线程的共享变量更新是相当普遍的。这种失败模式本身就可以解释您观察到的特定行为。
很可能,对这个同步问题的正确修复也能解决逻辑问题。
解决方案
有多种可能的解决方案。以下是一些有前途的:
-
信号量不是特别适合解决这个问题。无论如何你都需要一个互斥锁,它通常用于信号的对应物是一个条件变量。我会将两个信号量转换为两个(或者可能只是一个)普通整数变量,并在生产者和消费者中使用标准的互斥锁 + CV 模式。这将包括为消费者中的
count
读取添加互斥保护。 -
另一方面,如果您必须使用信号量,那么您可以
- 为消费者对
count
的读取添加适当的互斥保护 - 一定要保留消费者在成功递减信号量后是否可以实际消费商品的测试
- 在加入生产者线程之后但在尝试加入消费者之前,让主程序向
fillCount
发布两次(消费者数量 - 1 次)。这将取消阻止任何认为自己可以消费某件商品但在最后一件商品被另一位消费者消费后最终仍在等待的消费者。
- 为消费者对
-
或者你可以使用混合:保留
emptyCount
信号量来限制在任何给定时间等待的项目数量(而不是为此目的切换到 CV),但切换到互斥量 +用于管理消费者的简历模式。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。