微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

ExecutorService 线程在 RxJava 代码中没有按预期工作

如何解决ExecutorService 线程在 RxJava 代码中没有按预期工作

我在 RxJava 代码中使用 java.util.concurrent.ExecutorService 作为调度程序线程池:

  public class Test {
        ExecutorService poolA = newFixedThreadPool(10,threadFactory("Scheduler-A-%d"));
        Scheduler schedulerA = Schedulers.from(poolA);
        private ThreadFactory threadFactory(String pattern) {
        return new ThreadFactoryBuilder()
                .setNameFormat(pattern).build();
        }
        
        @Test
        public void testSubscribeOn() {
            log("Starting");
            final Observable<String> obs = simple();
            log("Created");
            obs
                    .doOnNext(Utils::log)
                    .map(x -> x+1) 
                    .doOnNext(Utils::log)
                    .subscribeOn(schedulerA)
                    .map(x -> x+2) 
                    .doOnNext(Utils::log)
                    .subscribe( 
                         x -> log("Got " + x),Throwable::printstacktrace,() -> log("Completed"));
    
            log("exiting");
        }
    }
    
    public class Utils {
        private static long start = System.currentTimeMillis();
    
        public static void log(Object label) {
            System.out.println(
                    System.currentTimeMillis() - start + "\t| " +
                            Thread.currentThread().getName() + "\t| " +
                            label);
        }
    }

我观察到主线程在调度程序线程开始工作之前退出,因此没有显示调度程序线程的输出。据我所知 ExecutorService 线程不是守护线程,那么为什么会出现这种行为?

解决方法

设置

Linux ThinkPad-P50 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

openjdk version "14.0.2" 2020-07-14
OpenJDK Runtime Environment AdoptOpenJDK (build 14.0.2+12)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 14.0.2+12,mixed mode,sharing)

我认为这与 JUnit-Runner 有关。让我们看看下面的例子:

public class WhileTrueJava {
  public static void main(String[] args) {
    Thread thread =
        new Thread(
            () -> {
              while (true) {}
            });
    thread.setDaemon(false);
    thread.start();

    log("exit");
  }

  private static void log(Object msg) {
    System.out.println(Thread.currentThread().getName() + "-" + msg);
  }
}

当我运行给定的示例时,进程不会退出。当我将其与 JUnit 一起使用时,该过程将完成。我还不能说,为什么会这样。

示例:

class So65650913 {
  static ExecutorService poolA = newFixedThreadPool(10,threadFactory("Scheduler-A-%d"));
  static Scheduler schedulerA = Schedulers.from(poolA);

  public static void main(String[] args) {
    Utils.log("Starting");
    Observable<String> obs =
        Observable.fromCallable(
            () -> {
              Utils.log("fromCallable lambda called");
              Thread.sleep(1_000);
              Utils.log("fromCallable return value");
              return "42";
            });

    Utils.log("Created");

    Disposable completed =
        obs.doOnNext(Utils::log)
            .map(x -> x + 1)
            .doOnNext(Utils::log)
            .subscribeOn(schedulerA)
            .map(x -> x + 2)
            .doOnNext(Utils::log)
            .subscribe(
                x -> Utils.log("Got " + x),Throwable::printStackTrace,() -> Utils.log("Completed"));

    Utils.log("exiting");
  }

  private static ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder().setDaemon(false).setNameFormat(pattern).build();
  }
}

class Utils {
  private static final long start = System.nanoTime();

  private Utils() {}

  public static void log(Object label) {
    System.out.println(
            (System.nanoTime()
            - start) / 1_000_000
            + "\t| "
            + Thread.currentThread().getName()
            + "\t| "
            + label);
  }
}

输出

0   | main  | Starting
62  | main  | Created
77  | main  | exiting
78  | Scheduler-A-0 | fromCallable lambda called
1078    | Scheduler-A-0 | fromCallable return value
1082    | Scheduler-A-0 | 42
1082    | Scheduler-A-0 | 421
1082    | Scheduler-A-0 | 4212
1082    | Scheduler-A-0 | Got 4212
1082    | Scheduler-A-0 | Completed

看起来进程不会停止,直到 ThreadPool 被击落并执行所有正在运行的任务。

只需在 poolA.shutdown(); 方法的末尾添加 main,看看会发生什么。我会说,行为符合预期。

更新

JUnit 行为不同,因为以下代码^1

public static void main(String... args) {
    int exitCode = execute(System.out,System.err,args).getExitCode();
    System.exit(exitCode);
}

看起来,当主线程失败时,进程会被System.exit杀死。因此,这是JUnit ConsoleLauncher 的错。

^1 JUnit Github https://github.com/junit-team/junit5/blob/ae2a336fe4b371d398da386e8b336cc06329da7d/junit-platform-console/src/main/java/org/junit/platform/console/ConsoleLauncher.java

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。