Micronaut 中使用 RxJava 的 Websocket 请求路由

如何解决Micronaut 中使用 RxJava 的 Websocket 请求路由

我正在 Micronaut 中实现一个 WebSocket 路由器,它在 WebSocket“客户端”和 WebSocket“服务器”应用程序之间路由 WebSocket 文本消息。 每条消息都传达了主题 id,这是 url 路径 /{topic} 的一部分。来自同一主题的消息必须双向转发, 例如主题 A 的消息被转发到主题 A 等。see architecture diagram

WebSocket 路由
所以客户端创建到路由器的 WebSocket 连接,然后路由器创建到服务器的连接。这意味着一旦客户端创建到路由器的连接,路由器必须创建到服务器的连接。 为了在客户端 -> 服务器或服务器 -> 客户端的每个方向发送新消息,必须重用给定主题的现有连接。 这些连接将通过一些重试重新连接逻辑长期存在,我稍后会加入。

消息发送逻辑
客户端发送消息后,必须将其转发到服务器,反向服务器 -> 客户端也是如此。

问题
我被卡住的地方是:

  1. 如何使用 Micronaut 的 Flowable api 将消息从 ServerHandler 转发到 ClientHandler,反之亦然? 我需要的是一旦建立连接,只需保留连接建立的信息,并使用 Rx Java api 简单地转发 WebSocket 处理程序之间的消息。

最初我通过使用自定义的 WebSocketSession 缓存和 ServerHandler 和 ClientHandler 存储和检索来实现转发逻辑 来自缓存的 CompletableFuture。这种方法有效并且还解决了异步等待连接会话的问题, 但我想通过使用纯 RxJava Java 模式以更无状态的方式实现转发。 为会话使用自定义共享缓存的另一个设计缺陷是,它对于 Netty 中现有的会话注册表来说是多余的。

到目前为止,我已经基本实现了客户端和服务器的处理程序,客户端/服务器的 WebSocket 处理程序, 但我不知道如何正确集成 Flowable 的东西。

源代码ClientHandler:(处理从客户端到路由器的WebSocket连接)

@ServerWebSocket("/topic/{topicId}") public class ClientHandler {

private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);

private RxWebSocketClient webSocketClient;

private ConnectionProperties connectionProperties;

public ClientHandler(@Client("${connection.url}") RxWebSocketClient webSocketClient,ConnectionProperties connectionProperties) {
    this.webSocketClient = webSocketClient;
    this.connectionProperties = connectionProperties;
}

@OnOpen
public void onOpen(String topicId,WebSocketSession session) {
    LOG.info("Open connection for client topic: {}",topicId);
}

@OnMessage
public void onMessage(String topicId,String message,WebSocketSession session) {
    LOG.info("New message from client topic: {}",topicId);
    
    Flowable<ServerHandler> flowable = webSocketClient.connect(ServerHandler.class,connectionProperties.resolveURI(topicId));
    //? so now what to do with this
    // the basic function works,but new connection is created with each message
    //and caching flowable would not be much better than caching WebSocket sessions
    //also it would be nicer to connect in onOpen() and then subscribe in onMessage()

    flowable.subscribe(serverHandler -> {
        serverHandler.setClientSession(session); // register session to send messages back
        serverHandler.send(message); // send message after connection           
    },t -> LOG.error("Error handling client topic: {}",topicId,t)); //handle exception
}

@OnClose
public void onClose(String topicId) {
    LOG.info("Close connection for client topic: {}",topicId);     
}

@OnError
public void onError(String topicId,WebSocketSession session,Throwable t) {
    LOG.error("Error for client topic: {}",t);
}

}

源代码ServerHandler:(处理从路由器到服务器的WebSocket连接)

@ClientWebSocket("/topic/{topicId}") public class ServerHandler implements AutoCloseable,WebSocketSessionAware{

private static final Logger LOG = LoggerFactory.getLogger(ServerHandler.class);

private volatile WebSocketSession clientSession;

private volatile WebSocketSession serverSession;

private volatile String topicId;

@OnOpen
public void onOpen(String topicId,WebSocketSession session) {
    this.topicId = topicId;
    LOG.info("Open connection for server topic: {}",topicId);
}

@OnMessage
public Publisher<String> onMessage(String message) {
    LOG.info("New message from server topic: {}",this.topicId);        
    return clientSession.send(message); //could potentially use WebSocketBroadcaster as well and send to topic based on predicate
}

@OnClose
public void onClose(WebSocketSession session) {
    LOG.info("Close connection for server topic: {}",this.topicId);
    if (clientSession != null) {            
        clientSession.close();
    }
}

@Override
public void close() throws Exception {
    LOG.info("Closing handler for server topic: {}",this.topicId);
}

@Override
public void setWebSocketSession(WebSocketSession serverSession) {
    this.serverSession = serverSession;
}

public void send(String message) {
    if (serverSession == null) {
        throw new IllegalStateException("Can not send if connection not opened");
    }
    serverSession.sendAsync(message); //send message to server
}

public void forceClose() {
    if (serverSession != null) {
        serverSession.close();
    }
}

public void setClientSession(WebSocketSession clientSession) {
    this.clientSession = clientSession;
}

}

类似的现有代码
我在其他项目中搜索了类似的代码,并在 Spring Cloud Gateway(WebsocketRoutingFilter 类)中找到了类似功能的片段 在此处查看完整代码:https://github.com/spring-cloud/spring-cloud-gateway/blob/8722f4062ed21de28ebf56f69bccc5ad4ac1d29d/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/WebsocketRoutingFilter.java

这个例子使用了稍微不同的 api 和 Tomcat WebSocketSession 和 Reactor 库 但无论如何,我不知道如何在 Micronaut Netty 之上用 RxJava Flowable api 表达类似的东西。

        return client.execute(url,this.headers,new WebSocketHandler() {
            @Override
            public Mono<Void> handle(WebSocketSession proxySession) {
                // Use retain() for Reactor Netty
                Mono<Void> proxySessionSend = proxySession
                        .send(session.receive().doOnNext(WebSocketMessage::retain));
                // .log("proxySessionSend",Level.FINE);
                Mono<Void> serverSessionSend = session
                        .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
                // .log("sessionSend",Level.FINE);
                return Mono.zip(proxySessionSend,serverSessionSend).then();
            } 

额外问题:
2)如何处理超时和重试与 Flowable 的连接? 如果我有这样的代码:

Flowable flowable = serverFlowable.connect(ServerHandler.class,uri);

2a) 如果在给定超时后连接尝试失败,如何连接想要在连接建立后发送消息的等待订阅者并传播错误?
2b) 如果使用 RxJava 连接尝试失败,如何以指数方式重试连接(5,20,30 秒?)?我想 repeat() 可以用于这个吗?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res