微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Dubbo源码解析-服务暴露

                                    dubbo源码解析-服务暴露

 


服务发布-原理


一个发布的动作:暴露本地服务
 

 Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 127.0.0.1


第二个发布动作暴露远程服务

Export dubbo service com.alibaba.dubbo.demo.DemoService to url dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
   Register dubbo service com.alibaba.dubbo.demo.DemoService url dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&monitor=dubbo%3A%2F%2F192.168.48.117%3A2181%2Fcom.alibaba.dubbo.registry.RegistryService%3Fapplication%3Ddemo-provider%26backup%3D192.168.48.120%3A2181%2C192.168.48.123%3A2181%26dubbo%3D2.0.0%26owner%3Dwilliam%26pid%3D8484%26protocol%3Dregistry%26refer%3Ddubbo%253D2.0.0%2526interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526pid%253D8484%2526timestamp%253D1473908495729%26registry%3Dzookeeper%26timestamp%3D1473908495398&owner=william&pid=8484&side=provider&timestamp=1473908495465 to registry registry://192.168.48.117:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&backup=192.168.48.120:2181,192.168.48.123:2181&dubbo=2.0.0&owner=william&pid=8484&registry=zookeeper&timestamp=1473908495398, dubbo version: 2.0.0, current host: 127.0.0.1


第三个发布动作启动netty
 

 Start NettyServer bind /0.0.0.0:20880, export /192.168.100.38:20880, dubbo version: 2.0.0, current host: 127.0.0.1


第四个发布动作打开连接zk
 

   INFO zookeeper.ClientCnxn: opening socket connection to server /192.168.48.117:2181


第五个发布动作:到zk注册
 

 Register: dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1


第六个发布动作监听zk
 

   Subscribe: provider://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
   Notify urls for subscribe url provider://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465, urls: [empty://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465], dubbo version: 2.0.0, current host: 127.0.0.1

 


暴露本地服务和暴露远程服务的区别是什么?
1.暴露本地服务:指暴露在用一个JVM里面,不用通过调用zk来进行远程通信。例如:在同一个服务,自己调用自己的接口,就没必要进行网络IP连接来通信。
2.暴露远程服务:指暴露给远程客户端的IP和端口号,通过网络来实现通信。
zk持久化节点 和临时节点有什么区别?
持久化节点:一旦被创建,触发主动删除掉,否则就一直存储在ZK里面。
临时节点:与客户端会话绑定,一旦客户端会话失效,这个客户端端所创建的所有临时节点都会被删除

先给个完整发服务暴露原理的过程

ServiceBean.onApplicationEvent
-->export()
  -->ServiceConfig.export()
    -->doExport()
      -->doExportUrls()//里面有一个for循环,代表了一个服务可以有多个通信协议,例如 tcp协议 http协议,认是tcp协议
        -->loadRegistries(true)//从dubbo.properties里面组装registry的url信息
        -->doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) 
          //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
          -->exportLocal(URL url)
            -->proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
              -->
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
              -->extension.getInvoker(arg0, arg1, arg2)
                -->StubProxyFactoryWrapper.getInvoker(T proxy, Class<T> type, URL url) 
                  -->proxyFactory.getInvoker(proxy, type, url)
                    -->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
                      -->Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl)
                        -->makeWrapper(Class<?> c)
                      -->return new AbstractProxyInvoker<T>(proxy, type, url)
            -->protocol.export
            ->ProtocolFilterWrapper
                 -->Protocol$Adpative.export
                -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("injvm");
                -->extension.export(arg0)
                  -->ProtocolFilterWrapper.export
                    -->buildInvokerChain //创建8个filter
                    -->ProtocolListenerWrapper.export
                      -->InjvmProtocol.export
                        -->return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap)
                        -->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter
          //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
          -->proxyFactory.getInvoker//原理和本地暴露一样都是为了获取一个Invoker对象
          -->protocol.export(invoker)
            -->Protocol$Adpative.export
              -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
               -->extension.export(arg0)
                 -->ProtocolFilterWrapper.export
                   -->ProtocolListenerWrapper.export
                     -->RegistryProtocol.export
                       -->doLocalExport(originInvoker)
                         -->getCacheKey(originInvoker);//读取 dubbo://192.168.100.51:20880/
                         -->rotocol.export
                           -->Protocol$Adpative.export
                             -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("dubbo");
                             -->extension.export(arg0)
                               -->ProtocolFilterWrapper.export
                                 -->buildInvokerChain//创建8个filter
                                 -->ProtocolListenerWrapper.export
---------1.netty服务暴露的开始-------    -->dubboProtocol.export
                                     -->serviceKey(url)//组装key=com.alibaba.dubbo.demo.DemoService:20880
                                     -->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService:20880, this=dubboExporter
                                     -->openServer(url)
                                       -->createServer(url)
--------2.信息交换层 exchanger 开始-------------->Exchangers.bind(url, requestHandler)//exchaanger是一个信息交换层
                                           -->getExchanger(url)
                                             -->getExchanger(type)
                                               -->ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
                                           -->HeaderExchanger.bind
                                             -->Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
                                               -->new HeaderExchangeHandler(handler)//this.handler = handler
                                               -->new DecodeHandler
                                                  -->new AbstractChannelHandlerDelegate//this.handler = handler;
---------3.网络传输层 transporter--------------------->Transporters.bind
                                                 -->getTransporter()
                                                   -->ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
                                                 -->Transporter$Adpative.bind
                                                   -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty");
                                                   -->extension.bind(arg0, arg1)
                                                     -->NettyTransporter.bind
                                                       --new NettyServer(url, listener)
                                                         -->AbstractPeer //this.url = url;    this.handler = handler;
                                                         -->AbstractEndpoint//codec  timeout=1000  connectTimeout=3000
                                                         -->AbstractServer //bindAddress accepts=0 idleTimeout=600000
---------4.打开断开,暴露netty服务-------------------------------->doopen()
                                                           -->设置 NioServerSocketChannelFactory boss worker的线程池 线程个数为3
                                                           -->设置编解码 hander
                                                           -->bootstrap.bind(getBindAddress())
                                               -->new HeaderExchangeServer
                                                 -->this.server=NettyServer
                                                 -->heartbeat=60000
                                                 -->heartbeatTimeout=180000
                                                 -->startHeatbeatTimer()//这是一个心跳定时器,采用了线程池,如果断开就心跳重连。
-->getRegistry(originInvoker)//zk 连接
                         -->registryFactory.getRegistry(registryUrl)
                           -->ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper");
                           -->extension.getRegistry(arg0)
                             -->AbstractRegistryFactory.getRegistry//创建一个注册中心,存储在REGISTRIES
                               -->createRegistry(url)
                                 -->new ZookeeperRegistry(url, zookeeperTransporter)
                                   -->AbstractRegistry
                                     -->loadProperties()//目的:把C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
                                                                                                                                                                       文件中的内容加载为properties
                                     -->notify(url.getBackupUrls())//不做任何事             
                                   -->FailbackRegistry   
                                     -->retryExecutor.scheduleWithFixedDelay(new Runnable()//建立线程池,检测并连接注册中心,如果失败了就重连
                                   -->ZookeeperRegistry
                                     -->zookeeperTransporter.connect(url)
                                       -->ZookeeperTransporter$Adpative.connect(url)
                                         -->ExtensionLoader.getExtensionLoader(ZookeeperTransporter.class).getExtension("zkclient");
                                         -->extension.connect(arg0)
                                           -->ZkclientZookeeperTransporter.connect
                                             -->new ZkclientZookeeperClient(url)
                                               -->AbstractZookeeperClient
                                               -->ZkclientZookeeperClient
                                                 -->new ZkClient(url.getBackupAddress());//连接ZK
                                                 -->client.subscribeStateChanges(new IZkStateListener()//订阅的目标:连接断开,重连
                                       -->zkClient.addStateListener(new StateListener() 
                                         -->recover //连接失败 重连
                                         
                       -->registry.register(registedProviderUrl)//创建节点
                         -->AbstractRegistry.register
                         -->FailbackRegistry.register
                           -->doRegister(url)//向zk服务器端发送注册请求
                             -->ZookeeperRegistry.doRegister
                               -->zkClient.create
                                 -->AbstractZookeeperClient.create//dubbo/com.alibaba.dubbo.demo.DemoService/providers/
                                                            dubbo%3A%2F%2F192.168.100.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26
                                                            application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3D
                                                            com.alibaba.dubbo.demo.DemoService%26loadbalance%3Droundrobin%26methods%3DsayHello%26owner%3
                                                            Dwilliam%26pid%3D2416%26side%3Dprovider%26timestamp%3D1474276306353
                                   -->createEphemeral(path);//临时节点  dubbo%3A%2F%2F192.168.100.52%3A20880%2F.............
                                   -->createPersistent(path);//持久化节点 dubbo/com.alibaba.dubbo.demo.DemoService/providers
                                       
                                       
                       -->registry.subscribe//订阅ZK
                         -->AbstractRegistry.subscribe
                         -->FailbackRegistry.subscribe
                           -->doSubscribe(url, listener)// 向服务器端发送订阅请求
                             -->ZookeeperRegistry.doSubscribe
                               -->new ChildListener()
                                 -->实现了 childChanged
                                   -->实现并执行 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                 //A
                               -->zkClient.create(path, false);//第一步:先创建持久化节点/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
                               -->zkClient.addChildListener(path, zkListener)
                                 -->AbstractZookeeperClient.addChildListener
                                   //C
                                   -->createTargetChildListener(path, listener)//第三步:收到订阅后的处理,交给FailbackRegistry.notify处理
                                     -->ZkclientZookeeperClient.createTargetChildListener
                                       -->new IZkChildListener() 
                                         -->实现了 handleChildChange //收到订阅后的处理
                                          -->listener.childChanged(parentPath, currentChilds);
                                          -->实现并执行ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                          -->收到订阅后处理 FailbackRegistry.notify
                                   //B        
                                   -->addTargetChildListener(path, targetListener)////第二步
                                     -->ZkclientZookeeperClient.addTargetChildListener
                                       -->client.subscribeChildChanges(path, listener)//第二步:启动加入订阅/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
                       
                       -->notify(url, listener, urls)
                         -->FailbackRegistry.notify
                           -->doNotify(url, listener, urls);
                             -->AbstractRegistry.notify
                               -->saveProperties(url);//把服务端的注册url信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
                                 -->registryCacheExecutor.execute(new SaveProperties(version));//采用线程池来处理
                               -->listener.notify(categoryList)
                                 -->RegistryProtocol.notify
                                   -->RegistryProtocol.this.getProviderUrl(originInvoker)//通过invoker的url 获取 providerUrl的地址

 

现在开始细化其中的步骤

ServiceBean.onApplicationEvent
ServiceBean.onApplicationEvent 方法
ServiceBean.afterPropertiesSet 方法
AnnotationBean.postProcessAfterInitialization 方法
public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
           // 服务暴露
            export();
        }
    }
-->export() 方法

serviceconfig.export()方法

 public synchronized void export() {
        // 暴露提供的服务
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }
        // 是否需要延时暴露
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            // 暴露服务
            doExport();
        }
    }

serviceconfig.doExport()方法

   // 校验<dubbo:application name="Trade" owner="Trade" organization="cpcn" version="v1.0.0" />没有配置抛出异常
      checkApplication();
//校验<dubbo:registry file="${dubbo.registry.file}" address="${dubbo.registry.address}" />没有配置抛出异常
        checkRegistry();
//如果没有配置protocol会set(new ProtocolConfig())
        checkProtocol();
        appendProperties(this);
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        // 做一些
        doExportUrls();
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);

 

看ServiceConfig.doExportUrls();

  private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        //代表了一个服务可以有多个通信协议,例如 tcp协议 http协议,认是tcp协议
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

再看ServiceConfig.doExportUrlsFor1Protocol()

 

 

 

本地服务暴露

 

 

远程暴露

RegistryProtocol.export()

这个方法虽然是叫doLocalExport() 其实里面是走了一个远程暴露的方法

上面的protocol.export()方法中进入dubboProtocol 的export方法

dubboProtocol.export()

 

 

 

启动netty

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

 

 

 

 

 

注意看这个requestHandler 这个类的处理

 try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;

 

 

服务端接收请求的后的处理 看reply方法

   public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsstr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsstr == null || methodsstr.indexOf(",") == -1) {
                        hasMethod = inv.getmethodName().equals(methodsstr);
                    } else {
                        String[] methods = methodsstr.split(",");
                        for (String method : methods) {
                            if (inv.getmethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getmethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

 

继续看打开netty

 

 

 

 

 

 

 

暴露远程服务,启动netty 之后,注册zk

RegistProtocol.export()

 

大家可能知道有个 本地的 dubbo.registry.file=/root/.dubbo/Accountant-registry-172.31.1.187.cache
的配置, 那什么时候去加载这个配置呢?

getRegistry 这个里面是加锁的  ,createRegistry得到的是zookeeperRegistry

 

 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 特别关注 super(url) 方法        
super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEParaTOR)) {
            group = Constants.PATH_SEParaTOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(new StateListener() {
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }

 

再次看里面的super(url)

    public FailbackRegistry(URL url) {
        super(url);
        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at Failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

看这个

 

加载完配置信息后就会通知listener 的  前提是已经有listen已经在监听这个服务了,否则啥都不做。

咱们看看这个文件长啥样

加载的配置文件在哪用呢?

如果订阅异常了,会从配置文件里面读取服务信息,也就是说zookeeper挂了,依然可以调用服务

 

什么时候更新这个cache呢?

订阅的时候,然后notify 就会去更新

 

 

重新了cache

 

 

/dubbo/cpcn.payment.shwealthbank401.rpc.ShwbAccountService/providers/dubbo%3A%2F%2F192.168.140.43%3A7101%2Fcpcn.payment.shwealthbank401.rpc.ShwbAccountService%3Faccepts%3D2000%26anyhost%3Dtrue%26application%3DSHWealthBank401%26application.version%3Dv1.0.0%26charset%3DUTF-8%26delay%3D-1%26dispatcher%3Dmessage%26dubbo%3D2.0.1%26executes%3D500%26generic%3Dfalse%26interface%3Dcpcn.payment.shwealthbank401.rpc.ShwbAccountService%26methods%3Dtx4711%2Ctx4710%2Ctx4706%2Ctx4705%2Ctx4708%2Ctx4707%2Ctx4702%2Ctx4701%2Ctx4712%2Ctx4704%2Ctx4703%2Ctx4709%26organization%3Dcpcn%26owner%3DSHWealthBank401%26pid%3D29716%26queues%3D0%26revision%3DSNAPSHOT%26serialization%3Dhessian2%26side%3Dprovider%26threadpool%3Dcached%26threads%3D500%26timeout%3D15000%26timestamp%3D1546070995869%26transporter%3Dnetty%26version%3D1.0.0

注册信息是带版本号的

 

注册ZK之后监听服务

 

 

底层还是走的zookeeper

 

configurator主要记录服务的一些配置信息

 

总结来讲就是将 invoker 转换成exporter

 

 

 

 

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐