微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

集群 Vert.x 环境中的 SockJS 连接

如何解决集群 Vert.x 环境中的 SockJS 连接

vertx 应用程序在 Docker 容器中运行,在两个 EC2 实例上运行并且是集群的。

集群是通过 hazelcast-aws 插件实现的,应用程序是这样启动的:

docker run --name ... -p ... \
--network ... \
-v ... \
-d ... \
-c 'exec java \
-Dvertx.eventBus.options.setClustered=true \
-Dvertx.eventBus.options.setClusterPort=15701 \
-jar ... -conf ... \
-cluster'

没有以编程方式设置任何与集群相关的内容

客户端在第一个请求时打开一个套接字,并将其用于以后的类似请求。
每个请求将:

  1. 通过向事件总线发布消息来发起与服务器的异步请求
  2. 在事件总线上注册一个消费者,它将处理上述结果, 并传递对套接字连接的引用,它应该将结果发送到

由于 vertx 在集群时认执行循环并且有两个实例,这意味着任何实例都会获取所有其他消息(从上面的 1. 开始)并使仅连接到一个实例的客户端接收到所有消息的一半预期响应。

我想这是因为,即使注册的使用者有对套接字对象的引用,它也不能使用它,因为它是在不同的节点/网络服务器上创建的。

这是否正确,是否有一种方法可以将 100% 的消息发送到客户端,仅连接到一个节点,而无需引入 RabbitMQ 之类的东西?

这是 SockJS 处理程序代码

SockJSHandler sockJSHandler = SockJSHandler.create(vertx,new SockJSHandlerOptions());
sockJSHandler.socketHandler(socket -> {
    SecurityService securityService = (SecurityService) ServiceFactory.getService(SecurityService.class);
    if (securityService.socketHeadeRSSecurity(socket)) {
        socket.handler(socketMessage -> {
            try {
                LOGGER.trace("socketMessage: " + socketMessage);
                Socket socket = Json.decodeValue(socketMessage.toString(),Socket.class);
                Report report = socket.getReport();
                if (report != null) {
                    Account accountRequest = socket.getAccount();
                    Account accountDatabase = accountRequest == null ? null
                            : ((AccountService) ServiceFactory.getService(AccountService.class)).getById(accountRequest.getId());
                    Response result = securityService.socketReportSecurity(accountRequest,accountDatabase,report) ?
                            ((ReportService) ServiceFactory.getService(ReportService.class)).createOrUpdateReport(report,accountDatabase)
                            : new Response(Response.unauthorized);
                    if (Response.success.equals(result.getResponse())) {
                        //register a consumer
                        String consumerName = "report.result." + Timestamp.from(ClockFactory.getClock().instant());
                        vertx.eventBus().consumer(consumerName,message -> {
                            Response executionResult;
                            if ("success".equals(message.body())) {
                                try {
                                    Path csvFile = Paths.get(config.getString(Config.reportPath.getConfigName(),Config.reportPath.getDefaultValue())
                                            + "/" + ((Report) result.getPayload()).getId() + ".csv");
                                    executionResult = new Response(new JsonObject().put("csv",new String(Files.readAllBytes(csvFile))));
                                } catch (IOException ioEx) {
                                    executionResult = new Response(new Validator("Failed to read file.",ioEx.getMessage(),null,null));
                                    LOGGER.error("Failed to read file.",ioEx);
                                }
                            } else {
                                executionResult = new Response(new Validator("Report execution Failed",(String)message.body(),null));
                            }
                            //send second message to client
                            socket.write(Json.encode(executionResult));
                            vertx.eventBus().consumer(consumerName).unregister();
                        });
                        //order report execution
                        vertx.eventBus().send("report.request",new JsonObject()
                                .put("reportId",((Report) result.getPayload()).getId())
                                .put("consumerName",consumerName));
                    }
                    //send first message to client
                    socket.write(Json.encode(result));
                } else {
                    LOGGER.info("Insufficient data sent over socket: " + socketMessage.toString());
                    socket.end();
                }
            } catch (DecodeException dEx) {
                LOGGER.error("Error decoding message.",dEx);
                socket.end();
            }
        });
    } else {
        LOGGER.info("Illegal socket connection attempt from: " + socket.remoteAddress());
        socket.end();
    }
});
mainRouter.route("/websocket/*").handler(sockJSHandler);

有趣的是,当在本地主机上运行两个集群节点时,客户端获得 100% 的结果。

编辑: 这不是 SockJS,而是配置问题。

解决方法

由于 vertx 在集群时默认执行循环,并且有 两个实例,这意味着任何实例都会获取其他所有消息(来自 1.,上面)并使仅连接到一个实例的客户端收到所有预期响应的一半。

这个假设只是部分正确。 Vert.x 执行循环,是的,但这意味着每个实例将获得一半的连接,而不是一半的消息。

一旦建立连接,它的所有消息都会到达一个实例。

所以:

这是否正确,是否有办法将 100% 的消息发送到 客户端,只连接一个节点,不介绍东西 像RabbitMQ?

已经发生了。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。