微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何以正确的顺序执行 Runnable/Thread

如何解决如何以正确的顺序执行 Runnable/Thread

想象一下这样的数据流

A,A,B,C,B...

现在假设我们有一个 StreamProcessor 来处理流。我们可以并行处理 A、B、C,但必须依次处理个别的 As、B、C。

示例:
线程 1:按顺序处理所有 As
线程2:依次处理所有B 等等……

所以对于 A、B、C,我有一个 StreamProcessor (SP)。

每个流元素都有一个时间戳,因此可以按时间排序(它实际上以正确的顺序出现)。元素必须按时间顺序处理。

所以现在我将所有流元素拆分到它们的处理器(SPA、SPB、SPC)。

我在我添加元素的地方有一个 TreeSet。

所以每当有新元素时,我基本上都会这样做:

 public synchronized void onNewElementReceived(Element element) {
        if (element== null) return;
        treeSet.add(element);
        if(treeSet.size()>30) logger.warn("There are many elements queueing up for processing");
        threadPool.execute(() -> process(treeSet.first()));
    }

private synchronized void process(Element element){
    //Do the processing
}

如果流足够慢以致进程在下一个元素出现之前终止,则这可以正常工作。但如果不是呢?如果有更多元素出现,我如何确保下一个元素也是将要处理的下一个元素?到底什么时候触发哪个线程由操作系统决定?

编辑:为了清楚起见,这是一个失败的例子:

假设 A 元素的 process() 执行需要 1 秒。现在,如果流提供 As 更快,那么我们可以处理它们,我们的 treeSet 将填充 A 类型的元素(我刚刚意识到它不会因为我们立即再次获取它,嗯,另一个问题)无论如何主要问题仍然存在。例如,如果我们每 100 毫秒接收一次元素,我们将请求执行 10 次 process 方法,但顺序将不再得到保证,因为我们不知道系统将首先执行哪个 Runnable。我们只是按照正确的顺序添加了它们,但是如何以正确的顺序执行它们?

我可以想象一直运行一个循环线程来获取队列的第一个元素,如果没有则中止进程。这是一个方法吗?

解决方法

我会这样做(PseudoCode-Like):

Results <- function(x){
for(i in 1:200){
    j <- a[i]
    k <- b[i]
    l <- c[i]
    m <- d[i]
    Negs[i] <- x[1L]*j + x[2L]*k + x[3L]*l + x[4L]*m
    cat()
}
for(i in 1:200){
    n <- e[i]
    o <- f[i]
    p <- g[i]
    q <- h[i]
    Poss[i] <- x[1L]*n + x[2L]*o + x[3L]*p + x[4L]*q
}
print(Negs)
print(Poss)
}

Results(c(2,1,2,1))

此代码由四个线程组成。

第一个线程从某个未指定的数据源接收元素。

如果这个线程收到一个,它会检查它是什么类型(A、B 或 C)。

这些类型中的每一种都有一个对应的 StreamProcessor。 onNewElementReceived 会将接收到的元素添加到对应的 StreamProcessor 的工作集中。

这些 StreamProcessor 线程中的每一个都会进行检查,直到它们被杀死并阻塞,直到它获得一个 Element,然后调用必须由每个子类实现的方法 abstract class StreamProcessor extends Thread{ private ThreadSafeList<Element> elements; void add(Element e) { elements.addAtEnd(e); } @Override public void run() { while(hasNotFinished()) { //If list has element,return the first element and remove it from the list,otherwise block until one is there and then return the first element and remove it. Element e = elements.blockingRemoveFirst(); this.workWith(e); } } abstract void workWith(Element e); } class StreamProcessorA extends StreamProcessor { @Override public void workWith(Element e) { //Do something } } class StreamProcessorB extends StreamProcessor { @Override public void workWith(Element e) { //Do something } } class StreamProcessorC extends StreamProcessor { @Override public void workWith(Element e) { //Do something } } class ElementReceiver { private StreamProcessor A; private StreamProcessor B; private StreamProcessor C; public synchronized void onNewElementReceived(Element e) { if(e.type() /*Whatever*/ == ElementType.A) { A.add(e); }else if(e.type() == ElementType.B) { B.add(e); }else { C.add(e); } } }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。