微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何检查 Java 8 Stream.forEach() 何时完成迭代?

如何解决如何检查 Java 8 Stream.forEach() 何时完成迭代?

我想使用 Java 8 Stream 提供的并行性,但我还需要按特定顺序完成某些操作,否则一切都会中断。问题是,使用流意味着代码现在是异步的 [注意:这是不正确的],而且我无法弄清楚只有在完成对整个集合的迭代后才能使某些事情发生。 [注意:这会自动发生]

现在,我的代码如下:

    public void iterateOverMap(Map<String,String> m)
    {
        AtomicInteger count = new AtomicInteger(0);
        
        m.keySet().stream().forEach((k) -> {
                    Object o = m.get(k);
                    
                    // do stuff with o
                    
                    count.incrementAndGet();
                });
        
        // spin up a new thread to check if the Stream is done
        new Thread(() -> {
            for (;;)
            {
                if (count.intValue() >= map.size())
                    break;
            }
            afterFinishedIterating();
        }).start();
    }

我不喜欢为了跟踪这件事而不得不启动一个新线程或阻塞主线程的想法,但我想不出我还能怎么做。有人知道更好的选择吗?

谢谢!

解决方法

Stream 处理是同步的。

如果您想要一个关于如何跟踪 Stream 进度的示例,您可以使用 peek() 中间操作,但请记住,它最好用于调试目的>

示例取自my other answer

Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
    .map(row -> process(row))
    .peek(stat -> {
        if (loader.incrementAndGet() % fivePercent == 0) {
            System.out.println(loader.get() + " elements on " + elementsCount + " treated");
            System.out.println((5*(loader.get() / fivePercent)) + "%");
        }
    })
    .reduce(MyStat::aggregate);
,

事实证明我遇到的问题是我使用的线程安全但不确定的迭代“Collections.synchronizedList(new LinkedList<>())”列表,而不是 Stream 的用法。 Stream 不像我假设的那样是异步的,所以答案只是“您不必这样做;它会为您完成”。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。