微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

通过 Java RSocket 请求通道发送消息时出错

如何解决通过 Java RSocket 请求通道发送消息时出错

我正在尝试创建 Java RSocket 通道。 这是我的 SpringBoot 服务器端:

@MessageMapping("channel")
    Flux<Payload> channel(Publisher<Payload> payloads) {

        System.out.println("Received Request Channel.");

        return Flux
                .from(payloads)
                .map(incomingPayload ->
                        DefaultPayload
                                .create("Channel Response: " + incomingPayload.getDataUtf8()));
    }
}

这是我使用原始 RSocket 的客户端:

public class Client {

    private final RSocket socket;
    private final CompositeByteBuf Metadata;

    public static void main(String[] args) throws InterruptedException {

        final String route = "channel";


        Client client = new Client(route);
        client.sendDataViaChannel();        
    }

    public Client(final String route) {

        this.socket = RSocketConnector.create()
                .MetadataMimeType(WellKNownMimeType.MESSAGE_RSOCKET_COMPOSITE_MetaDATA.getString())
                .connect(TcpClientTransport.create("localhost",8888))
                .block();

        // Metadata for routing
        this.Metadata = ByteBufAllocator.DEFAULT.compositeBuffer();
        setupRoute(route);

        System.out.println("Socket created!");
    }

    private void setupRoute(final String route) {
        RoutingMetadata routingMetadata = TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT,List.of(route));
        CompositeMetadataCodec.encodeAndAddMetadata(Metadata,ByteBufAllocator.DEFAULT,WellKNownMimeType.MESSAGE_RSOCKET_ROUTING,routingMetadata.getContent());

    }

 private void sendDataViaChannel() throws InterruptedException {

        Flux<Payload> payloads = Flux.range(1,512).map(i -> DefaultPayload.create(
                ByteBufAllocator.DEFAULT.buffer().writeBytes(("hello " + i).getBytes()),Metadata));

        socket.requestChannel(payloads)
                .doOnNext(p -> System.out.println(
                        "Received Back: " + p.getDataUtf8()
                ))
                .blockLast();
}

但是在启动客户端时出现此错误

Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0,decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
    at io.rsocket.util.DefaultPayload.create(DefaultPayload.java:107)
    at com.davide.client.Client.lambda$sendDataViaChannel$1(Client.java:102)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
    at reactor.core.publisher.Fluxrange$RangeSubscription.slowPath(Fluxrange.java:155)
    at reactor.core.publisher.Fluxrange$RangeSubscription.request(Fluxrange.java:110)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
    at io.rsocket.core.RequestChannelRequesterFlux.handleRequestN(RequestChannelRequesterFlux.java:714)
    at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:268)
    at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:211)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129)
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:365)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:324)
    at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:311)
    at io.netty.handler.codec.BytetoMessageDecoder.callDecode(BytetoMessageDecoder.java:432)
    at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Flux.blockLast(Flux.java:2519)
        at com.davide.client.Client.sendDataViaChannel(Client.java:109)
        at com.davide.client.Client.main(Client.java:37)

有人可以帮我找出问题所在吗?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。