如何在 Spring Boot RSocket Reactive 中处理入​​站流取消

如何解决如何在 Spring Boot RSocket Reactive 中处理入​​站流取消

目标

我希望在我的 Spring Boot 应用程序中有一个 RSocket 通道端点,我可以在其中处理入站、客户端驱动的流的取消以进行一些服务器端清理。

设置

相关依赖:

  • Spring Boot 2.4.2
  • 科特林 1.4.21
  • Kotlinx 协程 1.4.2
  • RSocket 核心 1.1.0

我已经尝试使用 Kotlin 协程流程和 Reactor Flux(en?)来实现我的目标。下面的两个客户端/服务器对应该做同样的事情:建立一个 RSocket 通道,从客户端发送 2 个“ping”有效载荷,服务器用“pong”有效载荷响应每个,然后客户端关闭连接。

流服务器端:

    @MessageMapping("testFlow")
    fun testPingFlow(input: Flow<String>): Flow<String> {
        val cs = CoroutineScope(EmptyCoroutineContext)
        val output = MutableSharedFlow<String>(10)

        cs.launch {
            try {
                input
                    .catch { e ->
                        logger.error("Rsocket server input error",e)
                    }
                    .onCompletion { exception ->
                        logger.debug("Rsocket server input completed")
                        if (exception != null) {
                            logger.error("Exception received while processing Rsocket server input flow",exception)
                        }
                    }
                    // Normal .collect complains about being internal-only
                    .collectIndexed { _,message ->
                        logger.debug("Rsocket server input received $message")
                        output.emit("pong ${System.currentTimeMillis()}")
                    }
            } catch (e: Throwable) {
                logger.error("Rsocket server input connection exception caught",e)
            }
        }
        return output
    }

流客户端测试:

    @Test
    fun testPingFlow() {
        val outToServer = MutableSharedFlow<String>(10)

        runBlocking {
            val socketFlow = rSocketRequester
                .route("testFlow")
                .data(outToServer.asFlux())
                .retrieveFlow<String>()
                .take(2)

            outToServer.emit("Ping ${System.currentTimeMillis()}")
            outToServer.emit("Ping ${System.currentTimeMillis()}")

            socketFlow
                .onCompletion { exception ->
                    logger.debug("Rsocket client output completed")
                    if (exception != null) {
                        logger.error("Exception received while processing Rsocket client output flow",exception)
                    }
                }
                .collect { message ->
                    logger.debug("Received pong from server $message")
                }
        }
    }

Flux 服务器端:

    @MessageMapping("testFlux")
    fun testPingFlux(input: Flux<String>): Flux<String> {
        val output = Sinks.many().unicast().onBackpressureBuffer<String>()
        try {
            input
                .doOnNext { message ->
                    logger.debug("Rsocket server input message received $message")
                }
                .doOnError { e ->
                    logger.error("Rsocket server input connection error",e)
                }
                .doOnCancel {
                    logger.debug("Rsocket server input cancelled")
                }
                .doOnComplete {
                    logger.debug("Rsocket server input completed")
                }
                .subscribe { message ->
                    output.tryEmitNext("pong ${System.currentTimeMillis()}")
                }
        } catch (e: Throwable) {
            logger.error("Rsocket server input connection exception caught",e)
        }
        return output.asFlux()
    }

Flux 客户端测试:

    @Test
    fun testPingFlux() {
        val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()

        rSocketRequester
            .route("testFlux")
            .data(outToServer.asFlux())
            .retrieveFlux<String>()
            .doOnCancel {
                logger.debug("Rsocket client output connection completed")
            }
            .doOnError { e ->
                logger.error("Exception received while processing Rsocket client output flow",e)
            }
            .take(2)
            .subscribe { message ->
                logger.debug("Received pong from server $message")
            }

        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
    }

问题

上面的两个客户端/服务器片段实际上都来回发送 ping/pong 有效负载,但在每种情况下,我都无法在客户端的服务器端处理取消连接。我从客户端获得了自己的 Rsocket client output completed 日志行,然后是来自 Reactor 的 Operator called default onErrorDropped 以及来自 RSocket 的以下堆栈跟踪:

java.util.concurrent.CancellationException: Inbound has been canceled
    at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
    at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

这是一个问题,因为(除了这个玩具示例之外)我的应用程序需要在连接关闭时进行服务器端清理。

我尝试失败的事情

  • 在 Flows 或 Fluxen 上捕获异常、取消或完成的所有各种方法,其中许多方法在上面的示例中进行了说明。
  • 订阅/收集 lambda 表达式中的 try/catch 块。
  • 通过映射运算符将服务器响应 Flux/Flow 直接耦合到输入 Flux/Flow,而不是创建单独的输出 Flux/Flow。
  • 在调试器中逐步执行框架代码,我可以说它很快就迷失了。我从这次冒险中得出的最佳理论是,接收取消信号的 Flux/Flow 以某种方式与我的服务器方法接收的输入 Flux/Flow 分离,但有太多抽象层让我无法追踪它。

在此先感谢您的帮助。

解决方法

Bug filed,将此问题标记为已回答。感谢大家的快速回复。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res