如何解决查找制作人是否完成人物生成
public class Producer implements Runnable {
@Override
public void run() {
Stream<Character> generate = Stream.generate(this::generaterandomCharacter).limit(15);
generate.forEach(character -> {
MyEvent myEvent = new MyEvent();
myEvent.setMesage(character + "");
LOG.info("Producer: " + name + " is waiting to transfer...");
try {
boolean added = transferQueue.tryTransfer(myEvent,4000,TimeUnit.MILLISECONDS);
if (added) {
numberOfProducedMessages.incrementAndGet();
LOG.info("Producer: " + name + " transferred element: A");
} else {
LOG.info("can not add an element due to the timeout");
}
} catch (InterruptedException e) {
e.printstacktrace();
}
});
}
}
提供消费者代码:
public class Consumer implements Runnable {
private static final Logger LOG = Logger.getLogger(Consumer.class.getName());
private final TransferQueue<MyEvent> transferQueue;
private final String name;
final int numberOfMessagesToConsume;
final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
Consumer(TransferQueue<MyEvent> transferQueue,String name,int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
}
@Override
public void run() {
while (true){
try {
LOG.info("Consumer: " + name + " is waiting to take element...");
MyEvent element = transferQueue.take();
longProcessing(element);
System.out.println("Consumer: " + name + " received element with messgae : " + element.getMesage());
} catch (InterruptedException e) {
e.printstacktrace();
}
}
}
private void longProcessing(MyEvent element) throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(5);
}
}
这是对消费者/生产者的呼吁:
TransferQueue<Event> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer( transferQueue,"1",2);
Consumer consumer = new Consumer(transferQueue,2);
exService.execute(producer);
exService.execute(consumer);
boolean isShutDown = exService.awaitTermination(5000,TimeUnit.MILLISECONDS);
if (!isShutDown) {
exService.shutdown();
}
生产者将仅创建消费者将使用的有限数量的字符。我怎么知道制作人是否完成了角色生成?
我考虑实现超时以了解生产者是否不再发送任何字符,但对于此实现可能有更好的选择。
解决方法
您可以只从生产者发送一个带有消息的事件,例如。 “完成”。
然后在您的消费者中只需检查此消息即可知道流已完成。
超时不是一个好主意,因为它发生的原因可能与关闭流的原因不同。
,有多种替代方法可以实现这一点:
-
使用特殊类型的事件来表明生产者已经完成。 (这基本上就是 Krzysztof Cichocki 所建议的答案)。优点:简单。缺点:您必须确保您选择表示“完成”的任何特殊事件都不可能是生产者发出的真实事件。
-
使用计数。看起来这就是您的代码已经尝试执行的操作。例如,将
numberOfMessagesToConsume
参数中的 15 传递给消费者构造函数,然后Run()
方法在消耗了 15 条消息后停止。优点:简单。缺点:不灵活,你可能事先不知道生产者会产生多少消息。 -
监控生产者线程的状态。例如,消费者可以检查
while (producerThread.isAlive()) {...}
。生产者线程将在完成消息生产后终止。优点:灵活性。缺点:您不希望消费者知道生产者线程,因为耦合太多。例如,您可以使用new Thread(...)
启动生产者,也可以使用ExecutorService
或CompletableFuture
。消费者不需要知道。
缓解选项 3 缺点的一种方法是将函数传递给使用者,以将生产者状态的测试与线程细节分离:
构造函数:
Consumer(TransferQueue<MyEvent> transferQueue,String name,BooleanSupplier isProducerStillProducing)
使用 lambda 调用构造函数:
new Consumer(transferQueue,name,() -> producerThread.isAlive())
在 run()
方法中测试:
while (isProducerStillProducing.getAsBoolean()) { ... }
,
有时协调生产者和消费者的关闭可能是一项非常令人费解的任务。有时,由于语法的差异,一种编程语言比另一种更容易。 以下使用 Ada 编程语言编写的示例创建了一个生产者和一个消费者。生产者向消费者发送一系列字符。消费者在收到每个字符时打印它。当生产者终止时,消费者终止。
此示例使用 Ada Rendezvous 机制在任务(也称为线程)之间进行通信。
with Ada.Text_IO; use Ada.Text_IO;
procedure Main is
task producer;
task consumer is
entry send (Item : in Character);
end consumer;
task body producer is
subtype lower is Character range 'a' .. 'z';
subtype upper is Character range 'A' .. 'Z';
begin
for C in lower loop
consumer.send (C);
delay 0.05;
end loop;
for C in upper loop
consumer.send (C);
delay 0.05;
end loop;
end producer;
task body consumer is
Char : Character;
begin
loop
select
accept send (Item : in Character) do
Char := Item;
end send;
Put (Char);
if Char = 'z' then
New_Line(2);
end if;
or
terminate;
end select;
end loop;
end consumer;
begin
null;
end Main;
这个程序的输出是:
abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。