微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Netty实现Dubbo RPC

emmmm不想一点一点介绍这个代码了,注释写的挺清楚的,主要思路就是Consumer通过反射获取对应的服务接口实例时,使用的代理模式。调用代理对象时的方法时,实际上是通过Netty的channel将对应的方法调用信息传给Provider直接wait()阻塞,然后通过synchronized+wait()+notify()实现线程通信,当Provider接收到客户端得请求时,根据请求数据去服务容器中寻找对应的服务进行调用处理(这里简单得通过一个Map模拟了服务容器,然后通过反射调用Method得invoke方法达到动态调用服务得办法),然后Provider调用成功后,将数据返回给consumer,consumer读取到处理结果后调用notify()方法,唤醒用户调用的代理方法,将处理结果返回。
代码中需要注意的点:

  1. 使用线程池获取线程通过channel发送数据给服务端,然后阻塞,通过简单的线程通信实现得到服务器响应后自动执行返回。
  2. 用户真正调用的是动态代理创建的代理对象,代理对象内部实现的才是上一步的步骤。
  3. 使用了Protobuf数据传输格式,需要编写*.proto文件,然后编译成Java源文件,最后记得加入protobuf的编解码器,
  4. 服务器通过一个Map模拟了服务调用功能的容器,然后通过反射的方式动态调用对用的方法
  5. 因为客户端没有加关闭同步的代码,不要加上关闭EventLoopGroup的代码。(别问我为啥,我因为这卡了一个多小时,各种找bug后面发现启动后自己关了)

差不多就这些内容,需要的同学可以分析一下对应的源码,客户端的handler代码实现相对比较巧妙,最后贴上代码

package client;

import service.RpcHelloService;

/**
 * 用于启动RPC的消费者服务
 */
public class ConsumerBootStrap {
    public static void main(String[] args) {
        ConsumerClient client = new ConsumerClient("127.0.0.1", 9898);
        RpcHelloService bean = (RpcHelloService) client.getBean(RpcHelloService.class);
        String msg = "你好呀,服务器。";
        String res = bean.hello(msg);
        System.out.println(res);
    }
}


package client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import protocol.Data;

import java.util.concurrent.Callable;

public class ConsumerChannelHandler
        extends SimpleChannelInboundHandler<Data.Rpcmessage>
        implements Callable{
    private Data.Rpcmessage msg;//需要发送给服务器的消息
    private ChannelHandlerContext context;//Netty的上下文对象,帮助我们发送调用请求
    private String result;//rpc的返回结果
    public void setMsg(Data.Rpcmessage msg) {
        System.out.println("setMsg");
        this.msg = msg;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("通道连接成功...");
        context = ctx;
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext ctx, Data.Rpcmessage msg) throws Exception {
        if (msg.getMsgType().equals(Data.Rpcmessage.MessageType.Response) ){
            if ("200".equals(msg.getCode())) {
                result = msg.getResult();//如果响应成功了,那么将结果赋给result
            }
            notify();//唤醒调用线程
        }
    }

    @Override
    public synchronized Object call() throws Exception {
        System.out.println("call...");
        //调用call方法将消息发送给服务提供者
        context.writeAndFlush(msg);
        wait();//等待服务提供者响应结果然后继续执行
        return result;
    }
}

package client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.socketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.string.StringEncoder;
import protocol.Data;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 消费者底层Netty网络通信客户端
 */
public class ConsumerClient {
    //用于发送服务调用的线程池
    private ExecutorService pool = Executors.newFixedThreadPool(5);
    private ConsumerChannelHandler handler;
    private  final String HOST;
    private  final int PORT;
    public ConsumerClient(String host, int port) {
        this.HOST = host;
        this.PORT = port;
    }

    public Object getBean(Class clazz){

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{clazz},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        //实际的业务方法需要通过线程池进行远程调用
                        //RpcHelloService hello
                        if (handler == null){
                            startClient();
                        }
                        Data.Rpcmessage message = Data.Rpcmessage.newBuilder().
                                setArgs((String) args[0])//设置方法执行的参数
                                .setClassName(clazz.getSimpleName())//设置远程调用的类名
                                .setMethod(method.getName())//设置调用方法名
                                .setParamClazz(args[0].getClass().getName())
                                .setMsgType(Data.Rpcmessage.MessageType.Rquest)//设置当前是一个请求调用
                                .build();
                        handler.setMsg(message);
                        return pool.submit(handler).get();//真正调用方法是远程的方法。
                    }
                }
        );
    }


    //并不是在创建ConsumerClient时候就启动客户端,只有真正准备调用服务接口之后才会准备启动
    private void startClient(){
        //客户端启动程序
        handler = new ConsumerChannelHandler();
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入protobuf的编解码器
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new ProtobufDecoder(Data.Rpcmessage.getDefaultInstance()));
                            //加入自定义业务处理器
                            pipeline.addLast(handler);
                        }
                    });
            bootstrap.connect(HOST, PORT).sync();
            System.out.println("客户端启动成功...");
        } catch (InterruptedException e) {
            e.printstacktrace();
        }

    }


}

Syntax="proto3";//指定protobuf的版本
option java_outer_classname="Data";
message Rpcmessage{
    string className = 1;//调用的类名
    string method = 2;//调用方法名
    string paramClazz = 3;//参数的全类名
    string args = 4;//方法调用所需要的参数
    string code = 5;//响应状态码
    string result = 6;//响应结果
    MessageType msgType = 7;//当前消息是请求还是响应
    enum MessageType{
        Rquest = 0;
        Response = 1;
    }
}
package server;

public class ProviderBootStrap {
    public static void main(String[] args) {
        ProviderServer server = new ProviderServer(9898);
        server.start();
    }
}

package server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import protocol.Data;
import service.RpcHelloService;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;

public class ProviderChannelHandler extends SimpleChannelInboundHandler<Data.Rpcmessage> {
    //使用一个hashMap模拟服务调用容器
    private static final HashMap<String,Object> MAP = new HashMap<>();

    static {
        RpcHelloService rpcHelloService = new RpcHelloService() {
            @Override
            public String hello(String msg) {
                return "你好呀,客户端,我已经收到了你的请求:["+msg+"]";
            }
        };
        MAP.put("RpcHelloService.hello",rpcHelloService);
        System.out.println("初始化容器");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Data.Rpcmessage msg) throws Exception {
        //收到了远程调用请求
        System.out.println("服务器读取数据");
        if (Data.Rpcmessage.MessageType.Rquest == msg.getMsgType()){
            String className = msg.getClassName();
            String method = msg.getmethod();
            String args = msg.getArgs();
            String paramClazz = msg.getParamClazz();
            String result = invokeRpc(className,method,args,paramClazz);
            Data.Rpcmessage resMsg =  Data.Rpcmessage.newBuilder()
                    .setMsgType(Data.Rpcmessage.MessageType.Response)
                    .setResult(result)
                    .setCode("200")
                    .build();
            ctx.writeAndFlush(resMsg);
        }
    }

    private String invokeRpc(String className, String method, String args,String paramClazz) {
        try {
            String key = className+"."+method;
            Object o = MAP.get(key);
            Method tarMethod = o.getClass().getDeclaredMethod(method, Class.forName(paramClazz));
            Object result = tarMethod.invoke(o, args);
            return (String) result;
        } catch (Exception e) {
            e.printstacktrace();
        }
        return null;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printstacktrace();
        ctx.close();
    }
}

package server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.socketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.string.StringDecoder;
import protocol.Data;

public class ProviderServer {
    private final int port;

    public ProviderServer(int port) {
        this.port = port;
    }
    public void start(){
        this.startServer0();
    }

    private void startServer0(){
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ProtobufEncoder());
                            pipeline.addLast(new ProtobufDecoder(Data.Rpcmessage.getDefaultInstance()));
                            pipeline.addLast(new ProviderChannelHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            System.out.println("服务器启动成功.....");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printstacktrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

package service;

public interface RpcHelloService {

    String hello(String msg);
}

protobuf编译后的源代码因为太长了,就不贴了,通过protoc.exe指令直接编译即可。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐