微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何实现多生产者多消费者单缓冲模型?

如何解决如何实现多生产者多消费者单缓冲模型?

一个缓冲区,多个生产者和多个消费者。任何生产者(只有一个)可以在缓冲区为空时写入缓冲区。任何消费者(只有一个)可以在缓冲区已满时读取缓冲区。如何实现这样的模型? 我尝试使用 pthread_cond_wait 和 pthread_cond_signal。但我没有找到解决方案。

解决方法

以下多生产者多消费者单缓冲区实现是使用 Ada 实现的。受保护的对象是缓冲区。有两个生产者和两个消费者。受保护对象隐式处理所有锁定和条件变量。

生产者任务类型、消费者任务类型和受保护对象都定义在一个 Ada 包中。

包装规格为:

-----------------------------------------------------------------------
-- Producer-consumer with bounded buffer
-----------------------------------------------------------------------
generic
   Capacity : Positive;
package Bounded_PC is
   task type Producer is
      entry set_id(Id : in Positive);
      entry Stop;
   end Producer;

   task type Consumer is
      entry set_id(Id : in Positive);
      entry Stop;
   end Consumer;

end Bounded_PC;

包体包含任务类型和共享缓冲区的实现。

with Ada.Text_IO; use Ada.Text_IO;

package body Bounded_PC is
   subtype Index_T is Positive range 1..Capacity;
   type Buf_Array is array (Index_T) of Integer;

   ------------
   -- Buffer --
   ------------

   protected Buffer is
      Entry Write(Item : in Integer);
      Entry Read(Item : out Integer);
   private
      Buf : Buf_Array;
      Write_Index : Index_T := 1;
      Read_Index  : Index_T := 1;
      Count       : Natural := 0;
   end Buffer;


   protected body Buffer is

      entry Write(Item : in Integer) when Count < Capacity is
      begin
         Buf(Write_Index) := Item;
         Write_Index := (Write_Index mod Capacity) + 1;
         Count := Count + 1;
      end Write;

      entry Read(Item : out Integer) when Count > 0 is
      begin
         Item := Buf(Read_Index);
         Read_Index := (Read_Index mod Capacity) + 1;
         Count := Count - 1;
      end Read;

   end Buffer;

   --------------
   -- Producer --
   --------------

   task body Producer is
      Value : Integer := 0;
      Me : Positive;
   begin
      accept Set_Id(Id : in Positive) do
         Me := Id;
      end Set_Id;

      loop
         select
            accept Stop;
            exit;
         else
            select
               Buffer.Write(Value);
               Put_Line("Producer" & Me'Image & " wrote" & Value'Image);
               Value := Value + 1;
            or
               delay 0.001;
            end select;
         end select;
      end loop;
   end Producer;

   --------------
   -- Consumer --
   --------------

   task body Consumer is
      Value : Integer;
      Me : Positive;
   begin
      accept Set_Id(Id : in Positive) do
         Me := Id;
      end Set_Id;

      loop
         select
            accept Stop;
            exit;
         else
            select
               Buffer.Read(Value);
               Put_Line("Consumer" & Me'Image & " read" & Value'Image);
            or
               delay 0.001;
            end select;
         end select;
      end loop;
   end Consumer;

end Bounded_PC;

每个入口调用的条件都在受保护的主体中声明。

entry Write(Item : in Integer) when Count < Capacity

entry Read(Item : out Integer) when Count > 0

这个程序的主要程序是:

with Bounded_PC;

procedure Main is
   package Int_Pck is new Bounded_Pc(10);
   use Int_Pck;

   P1 : Producer;
   P2 : Producer;
   C1 : Consumer;
   C2 : Consumer;
begin
   P1.Set_Id(1);
   P2.Set_Id(2);
   C1.Set_Id(1);
   C2.Set_Id(2);
   delay 0.01;
   P1.Stop;
   P2.Stop;
   delay 0.01;
   C1.Stop;
   C2.Stop;
end Main;
,

我想出了一个解决方案:

bool buffer_empty = true;

// consumer thread:
pthread_mutex_lock(&buffer_lock);
while (buffer_empty) {
    pthread_cond_wait(&buffer_full,&buffer_lock);
}
// consume
buffer_empty = true;
pthread_mutex_unlock(&buffer_lock);


// producer thread:
pthread_mutex_lock(&buffer_lock);
while (!buffer_empty) {
    pthread_cond_wait(&buffer_full,&buffer_lock);
}
// produce
buffer_empty = false;
pthread_mutex_unlock(&buffer_lock);
pthread_cond_signal(&buffer_full);

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。