如何解决并行反序列化步骤
有以下管道:
- 项目已产生(生产者在管道外部);
- 项目已反序列化(JSON到Java对象);
- 项目已处理;
目前,这一切都在单个线程中同步发生:
center
或示意性地:
while(producer.next()) {
var item = gson.deserialize(producer.item());
processItem(item);
}
关注点是,反序列化步骤没有副作用,可以并行进行以节省一些世界时间。
总体代码应如下所示:
PRODUCER -> DESERIALIZATION -> CONSUMER
(sync) (sync) (sync)
或示意性地:
var pipeline = new Pipeline<Item>();
pipeline.setProducer(producer);
pipeline.setDeserialization(gson::deserialize);
pipeline.setConsumer(item -> {
...
});
pipeline.run();
- 同步;
- 按照原始生产者的顺序生产编码后的物品。
问。是否有标准化的方式来编码这样的管道?
解决方法
尝试
while(producer.next()) {
CompletableFuture.supplyAsync(()-> gson.deserialize(producer.item()))
.thenRunAsync(item->processItem(item));
}
,
实现模式的一种方法是:
- 构造一个多线程执行程序来处理解码请求
- 拥有使用者队列;每次您提交要解码的商品时,还要将相应的Future对象添加到消费者队列中
- 让使用者线程坐在等待将物品从队列中取出的状态(因此将按发布的顺序使用它们),调用相应的get()方法[等待物品被解码]
所以“消费者”看起来像这样:
BlockingQueue<Future<Item>> consumerQueue = new LinkedBlockingDeque<>();
Thread consumerThread = new Thread(() -> {
try {
while (true) {
Future<Item> item = consumerQueue.take();
try {
// Get the next decoded item that's ready
Item decodedItem = item.get();
// 'Consume' the item
...
} catch (ExecutionException ex) {
}
}
} catch (InterruptedException irr) {
}
});
consumerThread.start()
与此同时,带有多线程“解码器”的“生产者”端看起来像这样:
ExecutorService decoder = Executors.newFixedThreadPool(4);
while (!producer.hasNext()) {
Item item = producer.next()
// Submit the decode job for asynchronous processing
Future<Item> p = decoder.submit(() -> {
item.decode();
},item);
// Also queue this decode job for future consumption once complete
consumerQueue.add(p);
}
作为一个单独的问题,我想知道您实际上是否会在实践中看到很多好处,因为通过坚持以相同顺序进行消耗,您必然会在过程中引入一系列条件。但是从技术上讲,这是您可以实现自己所追求的目标的一种方式。
P.S。如果您不想使用单独的使用者线程,则相同的“生产者”线程可以轮询队列中已完成的项目并按行执行。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。