如何解决Micronaut 中使用 RxJava 的 Websocket 请求路由
我正在 Micronaut 中实现一个 WebSocket 路由器,它在 WebSocket“客户端”和 WebSocket“服务器”应用程序之间路由 WebSocket 文本消息。 每条消息都传达了主题 id,这是 url 路径 /{topic} 的一部分。来自同一主题的消息必须双向转发, 例如主题 A 的消息被转发到主题 A 等。see architecture diagram
WebSocket 路由
所以客户端创建到路由器的 WebSocket 连接,然后路由器创建到服务器的连接。这意味着一旦客户端创建到路由器的连接,路由器必须创建到服务器的连接。
为了在客户端 -> 服务器或服务器 -> 客户端的每个方向发送新消息,必须重用给定主题的现有连接。
这些连接将通过一些重试重新连接逻辑长期存在,我稍后会加入。
消息发送逻辑
客户端发送消息后,必须将其转发到服务器,反向服务器 -> 客户端也是如此。
问题
我被卡住的地方是:
- 如何使用 Micronaut 的 Flowable api 将消息从 ServerHandler 转发到 ClientHandler,反之亦然? 我需要的是一旦建立连接,只需保留连接建立的信息,并使用 Rx Java api 简单地转发 WebSocket 处理程序之间的消息。
最初我通过使用自定义的 WebSocketSession 缓存和 ServerHandler 和 ClientHandler 存储和检索来实现转发逻辑 来自缓存的 CompletableFuture。这种方法有效并且还解决了异步等待连接会话的问题, 但我想通过使用纯 RxJava Java 模式以更无状态的方式实现转发。 为会话使用自定义共享缓存的另一个设计缺陷是,它对于 Netty 中现有的会话注册表来说是多余的。
到目前为止,我已经基本实现了客户端和服务器的处理程序,客户端/服务器的 WebSocket 处理程序, 但我不知道如何正确集成 Flowable 的东西。
源代码ClientHandler:(处理从客户端到路由器的WebSocket连接)
@ServerWebSocket("/topic/{topicId}") public class ClientHandler {
private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
private RxWebSocketClient webSocketClient;
private ConnectionProperties connectionProperties;
public ClientHandler(@Client("${connection.url}") RxWebSocketClient webSocketClient,ConnectionProperties connectionProperties) {
this.webSocketClient = webSocketClient;
this.connectionProperties = connectionProperties;
}
@OnOpen
public void onOpen(String topicId,WebSocketSession session) {
LOG.info("Open connection for client topic: {}",topicId);
}
@OnMessage
public void onMessage(String topicId,String message,WebSocketSession session) {
LOG.info("New message from client topic: {}",topicId);
Flowable<ServerHandler> flowable = webSocketClient.connect(ServerHandler.class,connectionProperties.resolveURI(topicId));
//? so now what to do with this
// the basic function works,but new connection is created with each message
//and caching flowable would not be much better than caching WebSocket sessions
//also it would be nicer to connect in onOpen() and then subscribe in onMessage()
flowable.subscribe(serverHandler -> {
serverHandler.setClientSession(session); // register session to send messages back
serverHandler.send(message); // send message after connection
},t -> LOG.error("Error handling client topic: {}",topicId,t)); //handle exception
}
@OnClose
public void onClose(String topicId) {
LOG.info("Close connection for client topic: {}",topicId);
}
@OnError
public void onError(String topicId,WebSocketSession session,Throwable t) {
LOG.error("Error for client topic: {}",t);
}
}
源代码ServerHandler:(处理从路由器到服务器的WebSocket连接)
@ClientWebSocket("/topic/{topicId}") public class ServerHandler implements AutoCloseable,WebSocketSessionAware{
private static final Logger LOG = LoggerFactory.getLogger(ServerHandler.class);
private volatile WebSocketSession clientSession;
private volatile WebSocketSession serverSession;
private volatile String topicId;
@OnOpen
public void onOpen(String topicId,WebSocketSession session) {
this.topicId = topicId;
LOG.info("Open connection for server topic: {}",topicId);
}
@OnMessage
public Publisher<String> onMessage(String message) {
LOG.info("New message from server topic: {}",this.topicId);
return clientSession.send(message); //could potentially use WebSocketBroadcaster as well and send to topic based on predicate
}
@OnClose
public void onClose(WebSocketSession session) {
LOG.info("Close connection for server topic: {}",this.topicId);
if (clientSession != null) {
clientSession.close();
}
}
@Override
public void close() throws Exception {
LOG.info("Closing handler for server topic: {}",this.topicId);
}
@Override
public void setWebSocketSession(WebSocketSession serverSession) {
this.serverSession = serverSession;
}
public void send(String message) {
if (serverSession == null) {
throw new IllegalStateException("Can not send if connection not opened");
}
serverSession.sendAsync(message); //send message to server
}
public void forceClose() {
if (serverSession != null) {
serverSession.close();
}
}
public void setClientSession(WebSocketSession clientSession) {
this.clientSession = clientSession;
}
}
类似的现有代码
我在其他项目中搜索了类似的代码,并在 Spring Cloud Gateway(WebsocketRoutingFilter 类)中找到了类似功能的片段
在此处查看完整代码:https://github.com/spring-cloud/spring-cloud-gateway/blob/8722f4062ed21de28ebf56f69bccc5ad4ac1d29d/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/WebsocketRoutingFilter.java
这个例子使用了稍微不同的 api 和 Tomcat WebSocketSession 和 Reactor 库 但无论如何,我不知道如何在 Micronaut Netty 之上用 RxJava Flowable api 表达类似的东西。
return client.execute(url,this.headers,new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession proxySession) {
// Use retain() for Reactor Netty
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain));
// .log("proxySessionSend",Level.FINE);
Mono<Void> serverSessionSend = session
.send(proxySession.receive().doOnNext(WebSocketMessage::retain));
// .log("sessionSend",Level.FINE);
return Mono.zip(proxySessionSend,serverSessionSend).then();
}
额外问题:
2)如何处理超时和重试与 Flowable 的连接?
如果我有这样的代码:
Flowable flowable = serverFlowable.connect(ServerHandler.class,uri);
2a) 如果在给定超时后连接尝试失败,如何连接想要在连接建立后发送消息的等待订阅者并传播错误?
2b) 如果使用 RxJava 连接尝试失败,如何以指数方式重试连接(5,20,30 秒?)?我想 repeat() 可以用于这个吗?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。