微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

自定义派生连接池正在与Java程序中的公共池一起使用

如何解决自定义派生连接池正在与Java程序中的公共池一起使用

我创建了以下程序,试图将自定义派生连接池传递给该程序,但我不想使用公共连接池,但是即使传递派生连接池后,我仍然看到正在使用通用池。解释为什么会发生

package com.example.javanewfeatures;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ForkJoinPoolExample {

    public static void main(String args[]) throws InterruptedException {

        List<Integer> numbers = buildIntRange();

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        Thread t1 = new Thread(() -> forkJoinPool.submit(() -> {
            numbers.parallelStream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 1 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).invoke());

        ForkJoinPool forkJoinPool2 = new ForkJoinPool(4);
        Thread t2 = new Thread(() -> forkJoinPool2.submit(() -> {
            numbers.parallelStream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 2 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).invoke());

        t1.start();
        t2.start();
        t1.join();
        t2.join();

    }

    private static List<Integer> buildIntRange() {
        return IntStream.range(0,10).Boxed().collect(Collectors.toUnmodifiableList());
    }

}

解决方法

...但是我仍然看到即使在通过fork联接池后仍在使用公共池

当然,当您创建ForkJoinPool的实例时,将不使用公共池。您可以打印以下语句以确保不是这种情况。

System.out.printf("Common Pool:%s\n",ForkJoinPool.commonPool());
System.out.printf("Custom Pool:%s\n",new ForkJoinPool(4));

但是在您的情况下,您的任务不使用公共池,Streams使用公共池进行并行计算。

现在,如果您希望流使用自定义池,则可以参考此帖子-Parallel streams in custom pool

在您当前的实现中,这就是行为。

  • 您正在使用ForkJoinPool.submit()将任务提交到池中。这样可以确保任务在自定义池中执行。
  • 但是将其张贴在返回的任务上,您正在调用ForkJoinTask.invoke()。这次,将在不是FJ线程的Thread t1上触发任务,因此任务将被提交到公共池。请参阅下面的ForkJoinTask.doInvoke()源代码:
  private int doInvoke() {
      int s; Thread t; ForkJoinWorkerThread wt;
      return (s = doExec()) < 0 ? s :
          ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
          (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue,this) :
          externalAwaitDone();
  }

  private int externalAwaitDone() {
      int s;
      ForkJoinPool cp = ForkJoinPool.common;
  //...
  }
  • 如果您观察到输出,将得到重复的结果。通过提交执行将使用自定义池,通过调用执行将使用公共池。
    ...
    Loop 1 : Thread[ForkJoinPool-2-worker-0,5,main]
    Loop 1 : Thread[ForkJoinPool.commonPool-worker-3,main]
    ...
    

要纠正您的实现,如上述参考文章所述,您可以进行以下更改

Thread t2 = new Thread(() -> {
    try {
        forkJoinPool2.submit(() -> {
            numbers.stream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 2 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).get(); /*change invoke to get and catch the exception*/
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
});

输出:

Loop 2 : Thread[ForkJoinPool-2-worker-1,main]
Loop 1 : Thread[ForkJoinPool-1-worker-1,main]
Loop 1 : Thread[ForkJoinPool-1-worker-3,main]
Loop 1 : Thread[ForkJoinPool-1-worker-2,main]
Loop 1 : Thread[ForkJoinPool-1-worker-0,main]
Loop 2 : Thread[ForkJoinPool-2-worker-1,main]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。