微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 Mutiny 反应式编程调用长时间运行的阻塞 void 返回方法?

如何解决如何使用 Mutiny 反应式编程调用长时间运行的阻塞 void 返回方法?

我在 Mutiny 的 Uni 上有一个 Async 和 Sync 方法调用链,有些方法一个长期运行的过程,返回类型为 void

在不阻塞下游的情况下调用/调用它们的正确方法是什么?

下面是简单的类比代码

class Root {

    public static void main(String[] args) {

        final Response response = getResponsePayload(); // Gets the the Payload from upstream service
        Uni.createFrom().item(response)
            .onItem().invoke(() -> System.out.println("Process Started"))
            .onItem().call(res -> {
            longRunningMethodAsync(res);    // long running blocking method,I want to run on a worker thread
            return Uni.createFrom().voidItem(); // This line I created,because of the ppipeline will be broken if the Uni is not returned from here
        })
            .onItem().transform(item -> item.hello + " mutiny")
            .onItem().transform(String::toupperCase)
            .subscribe().with(
            item -> System.out.println(">> " + item));  // This is printed to the console
    }



    // Boilerplate method - I created to invoke/call the actual method actual method - `longRunningMethod`,this method basically an adapter
    // This is the best apprach I Could come up,but I'm looking for better thatn this as I'm not conviced I'm doing it right
    private static UniSubscribe<Void> longRunningMethodAsync(final Response response) {

        
        return Uni.createFrom().voidItem().invoke(() -> longRunningMethod(response))
            .runSubscriptionOn(Infrastructure.getDefaultExecutor()).subscribe();
    }


    // Important - this is the method I want to run asynchronously independently of main *event-looP* thread.
    private static void longRunningMethod(final Response response) {
  
        System.out.println("Long running process started"); // Doesn't get printed,which means this is never called at all,not even in the blocked manner by the main even-loop thread
    }




   // Not as importatnt,I provded this in case if you like to run on your local Box
    private static Response getResponsePayload() {
        return new Response();
    }

    private static class Response {
        public final String hello = "hello";
    }
}

解决方法

一般来说,使用 runSubscriptionOn 并传递一个特定的执行器:

longRunningMethodAsync
   .runSubscriptionOn(executor);

请注意,它会将并发限制为 executor 中可用的线程数。

参考:

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?