微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

dubbo接口方法重载且入参未显式指定序列化id导致ClassCastException分析

问题描述&模拟

线上登录接口,通过监控查看,有类型转换异常,具体报错如下图

image-20220108220128374

此报错信息是dubbo consumer端显示,且登录大部分是正常,有少量部分会报类型转换异常,同事通过更换方法名+显示指定序列化id解决此问题,但是产生这个问题的真正原因是什么呢?没有指定序列化id吗?还是dubbo方法重载问题?为什么服务端不显示错误信息呢?,下面根据错误模拟下情况。

线上运行情况说明,报错的这台客户端部署在容器内,jdk版本

image-20220108220823552

服务方是混跑,有虚拟机和容器,容器的jdk版本相同,虚拟机jdk版本

image-20220108220912474

一开始认为是由于没有显示指定序列化id导致容器调用虚拟机的服务,由于jvm版本不一致导致的解码问题,但是分析和试验后,发现并非如此,模拟情况如下:

定义一个dubbo服务,方法重载且入参不显示指定序列化id,代码如下

//定义dubbo服务
public interface ProductService {
	Result<ProductVO> findProduct(String data);
	Result<ProductVO> findProduct(ProductDTO product);
}

//入参
@Data
public class ProductDTO  implements Serializable {
    //不显示指定序列化id
	private Integer productId;
	private String sn;
	private String code;
}

//出参
@Data
public class ProductVO implements Serializable{
	private static final long serialVersionUID = 4529782262922750326L;
	private Integer productId;
	private String productName;
}

dubbo客户端调用ProductService.findProduct(ProductDTO product),并使用jdk1.8.0_202版本,服务方使用jdk1.8.0_73版本,经过试验(jmeter压测),发现并未出现类型转换异常,现在通过代码分析来排除。

分析&dubbo provider处理请求流程

采用逆序方法,使用arthas进行反编译dubbo生成的代理类,ProductService生成的代理类是Wrapper2,内容如下

public Object invokeMethod(Object object, String name, Class[] classArray, Object[] objectArray)
			throws InvocationTargetException {
		ProductService productService;
		try {
			productService = (ProductService) object;
		} catch (Throwable throwable) {
			throw new IllegalArgumentException(throwable);
		}
		try {
			if ("findProduct".equals(name) && classArray.length == 1
					&& classArray[0].getName().equals("java.lang.String")) {
				return productService.findProduct((String) objectArray[0]);
			}
			if ("findProduct".equals(name) && classArray.length == 1
					&& classArray[0].getName().equals("org.pangu.dto.ProductDTO")) {
				return productService.findProduct((ProductDTO) objectArray[0]);
			}
		} catch (Throwable throwable) {
			throw new InvocationTargetException(throwable);
		}
		throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(name)
				.append("\" in class org.pangu.api.ProductService.").toString());
	}
}

通过查看反编译后的代码,得知dubbo方法重载,会根据方法类型和参数个数找到对应的目标方法执行。对于我这个线上问题,参数是ProductDTO,如果调用的是findProduct(String data),说明classArray[0]即参数类型是String类型,那么参数类型是如何得来的呢?根据自己之前写的dubbo流程分析,查看源码,在com.alibaba.dubbo.rpc.proxy.AbstractProxyInvoker#invoke(Invocation invocation)代码内容如下

image-20220108223056936

方法名称+方法类型+方面参数都封装在Invocation内,接着查找Invocation的来源,在dubboProtocol的匿名内部类dubboProtocol$1内发现,具体是reply(ExchangeChannel channel, Object message)方法内,参数message就是Invocation。

image-20220108225859703

接着看哪里调用dubboProtocol$1.reply(ExchangeChannel channel, Object message)方法,在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest(ExchangeChannel channel, Request req)方法内,com.alibaba.dubbo.remoting.exchange.Request.getData()获取此Invocation,即DecodeableRpcInvocation,那么接着看Request 以及Request.mData的来源;

接着向上找,在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received(Channel channel, Object message)的入参message就是Request ;

继续向上找,com.alibaba.dubbo.remoting.transport.DecodeHandler#received(Channel channel, Object message)的入参就是Request ,其中会对Request.mData即Invocation进行解码(认在IO线程已经解码过,这里实际并不会再执行解码DecodeableRpcInvocation#hasDecoded=true)。

image-20220108230753355

继续向上找,com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run()线程,message属性就是Request,那么接着只能找ChannelEventRunnable是如何创建并提交的

image-20220108230921910

继续向上找,在com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received(Channel channel, Object message)方法内创建ChannelEventRunnable并提交到线程池执行。

继续向上找,在com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler.received(Channel channel, Object message),入参message就是Request

继续向上找,com.alibaba.dubbo.remoting.transport.MultiMessageHandler.received(Channel channel, Object message)

image-20220109003230986

继续向上找,com.alibaba.dubbo.remoting.transport.AbstractPeer.received(Channel ch, Object msg)

继续向上找,com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler.channelRead(ChannelHandlerContext ctx, Object msg),看到这个就说明是netty的work线程,NettyServerHandler是个inbound & outbound事件

dubbo service netty启动添加的inbound&outbound即pipeline chain[HeadContext InternalDecoder InternalEncoder NettyServerHandler TailContext],说明前面肯定有执行InternalDecoder 的channelRead事件。此时入参message就是Request。

下面着重分析InternalDecoder 的channelRead事件,执行堆栈依次为:

InternalDecoder(io.netty.handler.codec.BytetoMessageDecoder).channelRead(ChannelHandlerContext ctx, Object msg)
InternalDecoder(io.netty.handler.codec.BytetoMessageDecoder).callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
dubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
dubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)
dubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
dubboCodec.decodeBody(Channel channel, InputStream is, byte[] header)
DecodeableRpcInvocation.decode()
DecodeableRpcInvocation.decode(Channel channel, InputStream input)

InternalDecoder是netty pipeline的inboud事件,执行的是channelRead,具体逻辑在InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)内,代码如下

image-20220109010146098

接着触发下一个inbound的channelRead动作,传入的就是Request了,代码说明如下

image-20220109010534969

接着看dubboCountCodec.decode(Channel channel, ChannelBuffer buffer),这里进行解码

//com.alibaba.dubbo.rpc.protocol.dubbo.dubboCountCodec#decode(Channel channel, ChannelBuffer buffer)
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex();//获取读位置
    MultiMessage result = MultiMessage.create();//MultiMessage是Request的集合
    do {
        Object obj = codec.decode(channel, buffer);//使用dubboCodec进行解码,下面根据解码结果进行不同处理
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//说明发生了tcp粘包,退出循环
            buffer.readerIndex(save);
            break;
        } else {
            result.addMessage(obj);//把obj即Request添加到集合MultiMessage
            logMessageLength(obj, buffer.readerIndex() - save);
            save = buffer.readerIndex();//设置新的buffer读位置,继续使用dubboCodec进行解码
        }
    } while (true);
    if (result.isEmpty()) {
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {//如果MultiMessage只有一个元素,则说明本次没有发生粘包
        return result.get(0);//返回Request
    }
    return result;//返回MultiMessage,在后续的MultiMessagehandler内获取Request的集合遍历处理
}

接着看dubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)解码过程,如何对dubbo协议解码的,先看下dubbo协议的报文结构

接着看代码,对着报文结构进行解码

//dubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer)
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    buffer.readBytes(header);//把缓冲区字节存放到header
    return decode(channel, buffer, readable, header);
}

//dubboCodec(ExchangeCodec).decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number.
    if (readable > 0 && header[0] != MAGIC_HIGH
        || readable > 1 && header[1] != MAGIC_LOW) {//非魔数,说明非dubbo报文的开头,说明发生了tcp拆包/粘包
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        return super.decode(channel, buffer, readable, header);
    }
    // check length.
    if (readable < HEADER_LENGTH) {//为什么是小于16呢?因为dubbo报文 magic(2)+falg(1)+status(1)+invokerId(8)+bodyLenght(4)就是16字节了,小于16字节,肯定发生了拆包,本次接收到的数据并没有body
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.
    int len = Bytes.bytes2int(header, 12);//12的原因是dubbo报文 magic(2)+falg(1)+status(1)+invokerId(8)等于12,从12位后取4位,转换为int,就是body的长度
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    if (readable < tt) {//可读取数少于bodylen+16,说明tcp拆包,需要继续进网络读取
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        return decodeBody(channel, is, header);//解码body内容
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}

接着看解码dubbo body,在com.alibaba.dubbo.rpc.protocol.dubbo.dubboCodec#decodeBody

//com.alibaba.dubbo.rpc.protocol.dubbo.dubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    if ((flag & FLAG_REQUEST) == 0) {//是响应,编码
        //省略
    } else {//请求,解码
        // decode request.
        Request req = new Request(id);
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            Object data;
            if (req.isHeartbeat()) {//心跳
                data = decodeHeartbeatData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
            } else if (req.isEvent()) {//事件
                data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
            } else {
                DecodeableRpcInvocation inv;
                if (channel.getUrl().getParameter(
                    Constants.DECODE_IN_IO_THREAD_KEY,
                    Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//认是在netty work线程进行解码
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    inv.decode();//解码dubbo body,解码结果保存在DecodeableRpcInvocation
                } else {
                    inv = new DecodeableRpcInvocation(channel, req,
                                                      new UnsafeByteArrayInputStream(readMessageData(is)), proto);//否则在业务线程ChannelEventRunnable进行解码
                }
                data = inv;
            }
            req.setData(data);//把Invocation保存到Request.mData
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode request Failed: " + t.getMessage(), t);
            }
            // bad request
            req.setbroken(true);
            req.setData(t);
        }
        return req;
    }
}

接着看DecodeableRpcInvocation解码dubbo body

//com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode()
@Override
public void decode() throws Exception {
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream);//解码
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc invocation Failed: " + e.getMessage(), e);
            }
            request.setbroken(true);
            request.setData(e);
        } finally {
            hasDecoded = true;//解码后置位已经解码,这样在ChannelEventRunnable线程内就不会再进行解码
        }
    }
}

//com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(com.alibaba.dubbo.remoting.Channel, java.io.InputStream)
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
        .deserialize(channel.getUrl(), input);//根据序列化标识获取反序列对象,dubbo spi的自适应

    String dubboVersion = in.readUTF();//从输入流读取dubbo version
    request.setVersion(dubboVersion);
    setAttachment(Constants.dubBO_VERSION_KEY, dubboVersion);

    setAttachment(Constants.PATH_KEY, in.readUTF());//从输入流读path
    setAttachment(Constants.VERSION_KEY, in.readUTF());//从输入流读版本

    setMethodName(in.readUTF());//从输入流读 调用的目标方法名
    try {
        Object[] args;
        Class<?>[] pts;
        String desc = in.readUTF();//从输入流读 参数描述符,即参数的类型 比如[Ljava/lang/String
        if (desc.length() == 0) {//dubbo调用方法不存在入参
            pts = dubboCodec.EMPTY_CLASS_ARRAY;
            args = dubboCodec.EMPTY_OBJECT_ARRAY;
        } else {//dubbo调用方法存在入参
            pts = ReflectUtils.desc2classArray(desc);//类型描述符转换为类型,比如[Ljava/lang/String => Ljava.lang.String
            args = new Object[pts.length];//参数长度
            for (int i = 0; i < args.length; i++) {
                try {
                    args[i] = in.readobject(pts[i]);//从输入流读取参数,这里是readobject,执行反序列化
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode argument Failed: " + e.getMessage(), e);
                    }
                }
            }
        }
        setParameterTypes(pts);//把参数类型保存到Invocation对象,即parameterTypes属性上

        Map<String, String> map = (Map<String, String>) in.readobject(Map.class);//从输入流读取隐式参数并解码
        if (map != null && map.size() > 0) {
            Map<String, String> attachment = getAttachments();
            if (attachment == null) {
                attachment = new HashMap<String, String>();
            }
            attachment.putAll(map);
            setAttachments(attachment);
        }
        //decode argument ,may be callback
        for (int i = 0; i < args.length; i++) {
            args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
        }

        setArguments(args);

    } catch (ClassNotFoundException e) {
        throw new IOException(StringUtils.toString("Read invocation data Failed.", e));
    } finally {
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
    }
    return this;
}

从解码dubbo body看出,从输入流解码获取调用的目标方法名称方法类型、方法入参、隐式参数都保存到Invocation对象(即DecodeableRpcInvocation),其中读取入参和隐式参数使用到了序列化解码(需要使用到序列化id),而从输入流获取方法名称+参数类型并没有使用对象的反序列化。

dubbo provider处理接收总结

dubbo prodiver端从网络到dubbo业务线程池调用以及如何解码流程分析完,现在总结下:

image-20220109225136447

dubbo provider接收并处理consumer请求分两步

1.网络通信,在io线程上解码,解码结果保存到Request。

2.IO线程调起dubbo业务线程,传入解码结果Request,通过Invoker调用目标方法,传入要执行目标方法的对象、方法名、参数类型、参数进行调用目标方法

该问题分析

解决2个问题

问题1:为什么在服务端报错ClassCastException,在服务端没有任何error日志呢?只有在客户端才有error日志

由于在dubbo代理类Wrapper2调用目标方法导致ClassCastException,异常被捕捉封装为InvocationTargetException向上抛,接着在com.alibaba.dubbo.rpc.proxy.AbstractProxyInvoker#invoke内异常被捕捉,封装为RpcResult,继而在ExceptionFilter内异常信息被封装为RuntimeException返回客户端。这中间并没有日志打印,因此不产生error日志,所以服务端看不到。

问题2:dubbo方法重载会导致问题吗?

结论,基本不会,dubbo的动态代理类WrapperX会根据Invocation的methodName+参数类型+参数进行调用目标方法,因此不会。网上有个大佬说dubbo方法重载在某种情况会导致问题,但是他写的语句有些不通顺且凌乱,而且蓝绿是流量隔离的,不会调错,我认为他的举例不合适,感兴趣的可以参考dubbo同名方法的问题及思考

问题3:是否是未显式指定序列化id导致的呢?

经过前面分析,是由于判断参数类型是String(本来应该是DTO类型),导致执行目标方法时候把参数转换为String导致的异常,参数类型来源于Invocation对象(即RpcInvocation.parameterTypes),而Invocation来源于Request.mData,而Request是网络通信解码得来,其中在com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(com.alibaba.dubbo.remoting.Channel, java.io.InputStream)String desc = in.readUTF();从输入流读取字节流并解码为参数类型描述符,这个地方并不涉及到对象的序列化和反序列化。

看客户端编码代码InternalEncoder,编码参数类型代码如下图

image-20220111002311123

而客户端发送建立Request是在com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int),而Invocation对象是在dubbo调用的入口InvokerInvocationHandler内(new RpcInvocation(method, args)封装方法名+参数创建Invocation对象,继而参数类型就保存在了Invocation对象。

这样分析得来,不显示指定序列化id并不会导致这个问题

排除了jdk版本、不显示指定序列化ID等原因,具体是什么原因导致的dubbo方法重载导致调用ClassCastException呢?线上预发环境和生产网络是互通,是否是预发环境同事手工部署的应用只有入参String的方法呢(未和生产同步版本)?同事也记不清了,也无法查,这个问题暂时是无法知道答案了。

据我猜测,问题可能出现是预发环境部署的服务没有和生产版本同步(缺少findProduct(ProductDTOdata)导致),我们预发和生成网络是互通的,应该是生产客户端调用到了预发环境服务,而预发环境部署的此服务没有findProduct(ProductDTOdata)。

为什么需要显示指定序列化id

rpc调用使用的TCP通信,需要把对象转换为二进制流进行发送(编码)和接收(解码),那么就需要有套规则需要把内存中的java对象转换为二进制流,序列化就是做这个事情的。

在使用原生序列化的时候,serialVersionUID起到了一个类似版本号的作用,在反序列化的时候判断serialVersionUID如果不相同,会抛出InvalidClassException。

如果在使用原生序列化方式的时候官方是强烈建议指定一个serialVersionUID的,如果没有指定,在序列化过程中,jvm会自动计算出一个值作为serialVersionUID,由于这种运行时计算serialVersionUID的方式依赖于jvm的实现方式,如果序列化和反序列化的jvm实现方式不一样可能会导致抛出异常InvalidClassException,所以强烈建议指定serialVersionUID。

不显示指定序列化ID实际会导致问题吗?

定义一个dubbo的入参,不显示指定序列化id,客户端运行不变更,服务端入参进行增加删除字段(类结构发生变化),发现均能正常请求,并非像网上所说的不显示指定序列化id情况下rpc参数类结构变化,并没有导致什么问题,当然我只是在jdk8版本下进行了此测试(当然现在都是jdk8),这样情况下,实际使用过程中,不显示指定序列化id好像也不会影响什么呢

网上有说法,不显示指定序列化id会导致一种情况出现问题:举个例子:比如该入参没有显示指定序列化id,后面有个需求需要在这个入参增加个字段,而且看没有显示指定序列化id,顺手就增加了个序列化id,这样线上运行的客户端应用由于引用的还是旧jar,新的服务部署上去,就会发送序列化失败(客户端jvm生成的序列化id和服务端显示指定的序列化id不同),好像这种情况是无法避免的。但是我经过测试,不显示指定序列化id情况下 对dubbo参数进行增加字段、删除字段、增加方法等都不会造成反序列化问题(jdk8, dubbo2.6.8下测试),请求均正常。验证结果说明jvm生成序列化id和类的结构没有关系。可以参考别人测试结果,和我测试结果相同。

那么是否就可以大胆的不指定序列化id呢?还是建议不要,鬼知道jvm生成序列化id的实现方式呢,不指定万一线上哪天出现幺蛾子。

验证了半天,得到一个不指定序列化id也没关系的实际验证结论,但是又不敢完全放心大胆不显示指定序列化id,抓狂。。。

最终结论

根据实际验证(jdk8, dubbo2.6.8下测试),不显示指定序列化id时,dubbo的传输对象在增加字段、删除字段、增加方法等都不会造成反序列化问题,但是还是强烈建议显示指定序列化id,万一jvm生成序列化id不兼容了呢

结尾

分析了这么长,最终也没找到这个问题的产生原因,但是对dubbo的通信层又加深了理解,下面一篇记录下总结的dubbo通信层

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐