微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Tcp.OutgoingConnection的Akka Stream替换套接字

如何解决使用Tcp.OutgoingConnection的Akka Stream替换套接字

我必须将Socket应用程序转换为akka,但似乎没有响应。我是akka的新手,修改示例似乎无效。

使用普通套接字建立与设备的连接,并且在将HEX cmd传递给设备后将返回HEX数据。 这是套接字线程的原始代码

            BlockingQueue<byte[]> incomingResults = new LinkedBlockingQueue<>();

            clientSocket = new Socket(ipAddress,port);
            clientSocket.setSoTimeout(5000);
            out = new DataOutputStream(clientSocket.getoutputStream());
            in = clientSocket.getInputStream();

            byte[] read = new byte[512];
            int len;
            out.write(Hex.decodeHex("F000001A501101007F1201F7"));
            Logger.debug("Connected and requested data on " + ipAddress + ":" + port);
            while ((len = in.read(read)) > -1) {
                incomingResults.add(Arrays.copyOfRange(read,len));
                if (new Date().getTime() - lastFe.getTime() > 8000) { // The data flow for approx 10 seconds,so need to re-request.
                    out.write(Hex.decodeHex("F000001A501101007F1201F7"));
                    lastFe = new Date();
                }
            }

此输入结果然后由另一个线程处理。 我正在使用Play!使用akka和使用原始线程的框架不能很好地工作。 但是我似乎无法弄清楚如何将它们放在一起,建立连接,然后能够发送cmd并继续处理传入的数据(并偶尔发出刷新cmd)。

如果可以立即异步处理数据,那么最好写一个单独的akka​​流,我认为可以用Materializer完成,但是我还没走那么远。

这是我的第一次尝试:

    String requestCmd = "F000001A501101007F1201F7";

    final Flow<ByteString,ByteString,CompletionStage<Tcp.OutgoingConnection>> connection =
        Tcp.get(system).outgoingConnection(ipAddress,port);

    final Flow<String,NotUsed> replParser =
        Flow.<String>create()
            .takeWhile(elem -> !elem.equals("q"))
            .concat(Source.single("BYE")) // will run after the original flow completes
            .map(elem -> ByteString.fromString(elem + "\n"));

    final Flow<ByteString,NotUsed> repl =
            Flow.of(ByteString.class)
                    .map(ByteString::utf8String)
                    .map(elem -> requestCmd)
                    .via(replParser)
                    .via(Framing.delimiter(ByteString.fromString("\n"),256,FramingTruncation.disALLOW))
                    .map(
                            text -> {
                                Logger.debug("Server: " + text.decodeString(ByteString.UTF_8()));
                                return "next";
                            }).via(replParser);

但这只会返回我发出的命令。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。