生产者消费者问题一个生产者多消费者java

如何解决生产者消费者问题一个生产者多消费者java

我正在努力解决生产者消费者问题。当我运行一个生产者和多个消费者时,问题就存在。只有一个消费者消耗缓冲区。据我所知,我的缓冲区实现可能有问题。我该如何解决这个问题?如果它不是缓冲区,我做错了什么?


class Producer extends Thread {
    private final Buffer _buf;
    private final int maxSize;
    private final String name;

    public Producer(Buffer _buf,int maxSize,String name) {
        super(name);
        this._buf = _buf;
        this.maxSize = maxSize;
        this.name = name;
    }

    @Override
    public void run() {
        synchronized (_buf) {
            for (; ; ) {
                while (_buf.isFull()) {
                    try {
                        System.out.println("Buffer is full," + "Producer thread waiting for " + "consumer to take something from buffer");
                        _buf.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                Random random = new Random();
                int i = random.nextInt();
                System.out.println(this.name + " producing value " + i);
                _buf.put(i);
                _buf.notifyAll();
                try {
                    Thread.sleep(200);
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
            }
        }
    }
}

class Consumer extends Thread {
    private final Buffer _buf;
    private final int maxSize;
    private final String name;


    public Consumer(Buffer _buf,String name) {
        super(name);
        this._buf = _buf;
        this.maxSize = maxSize;
        this.name = name;
    }

    @Override
    public void run() {
        synchronized (_buf) {
            for (; ; ) {
                while (_buf.isEmpty()) {
                    System.out.println("Buffer is empty," + "Consumer thread is waiting" + " for producer thread to put something in buffer");
                    try {
                        _buf.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                System.out.println(this.name + ": consuming value " + _buf.get());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
                _buf.notifyAll();
            }
        }
    }
}


class Buffer {
    public synchronized void put(int i) {
        // check for queue overflow
        if (isFull()) {
            System.out.println("Overflow\nProgram Terminated");
            System.exit(1);
        }

        System.out.println("Inserting " + i);

        rear = (rear + 1) % capacity;
        arr[rear] = i;
        count++;
        notifyAll();

    }

    public synchronized int get() {
        if (isEmpty()) {
            System.out.println("Underflow\nProgram Terminated");
            System.exit(1);
        }
        int result = arr[front];

        System.out.println("Removing " + arr[front]);

        front = (front + 1) % capacity;
        count--;
        notifyAll();
        return result;
    }


    private final int[] arr;      // array to store queue elements
    private int front;      // front points to the front element in the queue
    private int rear;       // rear points to the last element in the queue
    private final int capacity;   // maximum capacity of the queue
    private int count;      // current size of the queue

    // Constructor to initialize a buffer queue
    Buffer(int size) {
        arr = new int[size];
        capacity = size;
        front = 0;
        rear = -1;
        count = 0;
    }

    public int size() {
        return count;
    }

    public Boolean isEmpty() {
        return (size() == 0);
    }

    public Boolean isFull() {
        return (size() == capacity);
    }
}

public class PKmain {

    public static void main(String[] args) {
        int maxSize = 100;
        Buffer buffer = new Buffer(10);

        Thread producer = new Producer(buffer,maxSize,"PRODUCER");
        Thread consumer1 = new Consumer(buffer,"CONSUMER 1");
        Thread consumer2 = new Consumer(buffer,"CONSUMER 2");
        Thread consumer3 = new Consumer(buffer,"CONSUMER 3");
        Thread consumer4 = new Consumer(buffer,"CONSUMER 4");
        Thread consumer5 = new Consumer(buffer,"CONSUMER 5");
        Thread consumer6 = new Consumer(buffer,"CONSUMER 6");

        producer.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();
        consumer5.start();
        consumer6.start();
    }
}

这是控制台输出:

Inserting -1893944
PRODUCER producing value 1150242252
Inserting 1150242252
PRODUCER producing value 957139043
Inserting 957139043
PRODUCER producing value -806406909
Inserting -806406909
PRODUCER producing value 1701947892
Inserting 1701947892
PRODUCER producing value -174867893
Inserting -174867893
PRODUCER producing value 1272708996
Inserting 1272708996
PRODUCER producing value -1522880833
Inserting -1522880833
PRODUCER producing value -1041643777
Inserting -1041643777
PRODUCER producing value 1741137093
Inserting 1741137093
Buffer is full,Producer thread waiting for consumer to take something from buffer
Removing -1893944
CONSUMER 6: consuming value -1893944
Removing 1150242252
CONSUMER 6: consuming value 1150242252
Removing 957139043
CONSUMER 6: consuming value 957139043
Removing -806406909
CONSUMER 6: consuming value -806406909
Removing 1701947892
CONSUMER 6: consuming value 1701947892
Removing -174867893
CONSUMER 6: consuming value -174867893
Removing 1272708996
CONSUMER 6: consuming value 1272708996
Removing -1522880833
CONSUMER 6: consuming value -1522880833
Removing -1041643777
CONSUMER 6: consuming value -1041643777
Removing 1741137093
CONSUMER 6: consuming value 1741137093
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
PRODUCER producing value -1656771306
Inserting -1656771306
PRODUCER producing value 146381233
Inserting 146381233
PRODUCER producing value -303301670
Inserting -303301670
...

谢谢!!!

解决方法

只有一个消费者消耗缓冲区。据我所知,我的缓冲区实现可能有问题。我该如何解决这个问题?

对于多个消费者同时处理缓冲区,您需要释放对 _buf 的锁。在您的消费者代码中,您正在模拟睡眠工作。该睡眠不能在 synchronized 块内:

   synchronized (_buf) {
        ...
                // sleeping in a synchronized block is an anti-pattern
                Thread.sleep(1000);
        ...
    }

在您的生产者代码中,您也在 synchronized 块内休眠。您应该关闭锁,然后在外面睡觉,然后重新进入 synchronized 块。

要修复消费者,您应该更改消费者代码,以便有 2 个 synchronized 块:一个用于等待并获取缓冲区,另一个用于通知其他人缓冲区可用。

类似于:

    for (; ; ) {
        int contents;
        synchronized (_buf) {
            while (_buf.isEmpty()) {
                try {
                    _buf.wait();
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
            bufferContents = buf.get();
        }
        // now that we have the contents,we can work on them OUTSIDE of the lock
        System.out.println(this.name + ": consuming value " + contents);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }
        synchronized (_buf) {
           _buf.notifyAll();
        }
    }

如果我们查看 Buffer,任何查看其状态的内容都必须是 synchronized。这意味着 size()isEmpty()isFull() 也需要为 synchronized

当我使用这些修改运行您的代码时,我看到:

PRODUCER producing value 1329586226
Inserting 1329586226
Removing 1329586226
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
CONSUMER 6: consuming value 1329586226
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
PRODUCER producing value 176292473
Inserting 176292473
Removing 176292473
CONSUMER 1: consuming value 176292473
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
PRODUCER producing value -1900116049
Inserting -1900116049
Removing -1900116049
CONSUMER 5: consuming value -1900116049
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
PRODUCER producing value 1933484634
Inserting 1933484634
Removing 1933484634
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
CONSUMER 2: consuming value 1933484634
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
PRODUCER producing value 1398832650
Inserting 1398832650
Removing 1398832650

其他几个评论:

  • 不要捕捉Exception,捕捉InterruptedException并确保在捕捉后执行Thread.currentThread().interrupt()以重新中断线程。
  • 在线程程序中使用 System.out.println(...) 时要小心,因为它是一个同步调用,可以改变线程的时间。
  • Buffer 是方法应该返回 boolean 而不是 Boolean
  • 我认为这是一个练习。如果您真的这样做,我建议您使用 BlockingQueue,它负责在线程之间共享数据以及为您提供锁定和信号。
,

您的消费者无法并行处理缓冲区的原因是 ['id1','id2',...] 部分。因此,当单个消费者获得锁时,其他消费者都无法处理。作为解决方案,我建议在缓冲区操作(这也存在于您的代码中)方面摆脱消费者和生产者方面的锁定。代码可能如下所示:

制作人部分:

synchronized (_buf)

消费者部分:

    @Override
    public void run() {
        for (; ; ) {

            Random random = new Random();
            int i = random.nextInt();
            System.out.println(this.name + " producing value " + i);
            _buf.put(i);
            try {
                Thread.sleep(200);
            } catch (InterruptedException exception) {
                exception.printStackTrace();
            }
        }

    }

缓冲部分:

    @Override
    public void run() {

        for (; ; ) {
            System.out.println(this.name + ": consuming value " + _buf.get());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException exception) {
                exception.printStackTrace();
            }
        }
    }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res