微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Dubbo之服务注册与发布

服务注册与发布,书接上回,接着分析dubbo的服务注册与发布。

首先了解一下dubbo怎么解析自定义标签的。

第一种:xml

dubbo-config 模块的 dubbo-config-spring 下面,找到resources下面的meta-inf下面的spring.handlers文件,该文件主要是指定标签校验为本地实现类,该文件里面的内容也是以key、value存储的,key就是我们在xml里面必须要添加的schema头,value就是解析的类,然后再去定义一个xsd文件,通过我们的定义按照这个文件进行约束供我们进行实现。里面配置了一个dubboNamespaceHandler 类,该类实现于spring-framework下面的beans模块的 NamespaceHandlerSupport 抽象类,该抽象类又实现于 NamespaceHandler 接口,这个接口里面有一个init()方法dubboNamespaceHandler 通过init()方法会把我们在xml里面配置的标签通过 dubboBeanDeFinitionParser 给解析出来

在这里插入图片描述

在这里插入图片描述

在init()方法里面可以看到registerBeanDeFinitionParser()方法的第一个参数就是我们在xml里面配置的<dubbo:xxx> 标签,然后把这些标签解析为 dubboBeanDeFinitionParser 构造方法里面出入的class类型,比如第一个application,会把配置文件里面的<dubbo:application>转为 ApplicationConfig ,最后转为 BeanDeFinition 能够让spring管理dubbo

public class dubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {

    static {
        Version.checkDuplicate(dubboNamespaceHandler.class);
    }

    @Override
    public void init() {
        registerBeanDeFinitionParser("application", new dubboBeanDeFinitionParser(ApplicationConfig.class, true));
        registerBeanDeFinitionParser("module", new dubboBeanDeFinitionParser(ModuleConfig.class, true));
        registerBeanDeFinitionParser("registry", new dubboBeanDeFinitionParser(RegistryConfig.class, true));
        registerBeanDeFinitionParser("config-center", new dubboBeanDeFinitionParser(ConfigCenterBean.class, true));
        registerBeanDeFinitionParser("Metadata-report", new dubboBeanDeFinitionParser(MetadataReportConfig.class, true));
        registerBeanDeFinitionParser("monitor", new dubboBeanDeFinitionParser(MonitorConfig.class, true));
        registerBeanDeFinitionParser("metrics", new dubboBeanDeFinitionParser(MetricsConfig.class, true));
        registerBeanDeFinitionParser("ssl", new dubboBeanDeFinitionParser(SslConfig.class, true));
        registerBeanDeFinitionParser("provider", new dubboBeanDeFinitionParser(ProviderConfig.class, true));
        registerBeanDeFinitionParser("consumer", new dubboBeanDeFinitionParser(ConsumerConfig.class, true));
        registerBeanDeFinitionParser("protocol", new dubboBeanDeFinitionParser(ProtocolConfig.class, true));
        registerBeanDeFinitionParser("service", new dubboBeanDeFinitionParser(ServiceBean.class, true));
        registerBeanDeFinitionParser("reference", new dubboBeanDeFinitionParser(ReferenceBean.class, false));
        registerBeanDeFinitionParser("annotation", new AnnotationBeanDeFinitionParser());
    }
    
    ......
}

dubboBeanDeFinitionParser 实现于 spring-framework的beans模块的 BeanDeFinitionParser 接口,该接口有一个

BeanDeFinition parse(Element element, ParserContext parserContext)方法,该方法主要是解析我们xml里面的标签

dubboBeanDeFinitionParser 接口的主要实现代码

@Override
public BeanDeFinition parse(Element element, ParserContext parserContext) {
    return parse(element, parserContext, beanClass, required);
}

@SuppressWarnings("unchecked")
    private static RootBeanDeFinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
        RootBeanDeFinition beanDeFinition = new RootBeanDeFinition();
        beanDeFinition.setBeanClass(beanClass);
        beanDeFinition.setLazyInit(false);
        String id = resolveAttribute(element, "id", parserContext);
        if (StringUtils.isEmpty(id) && required) {
            String generatedBeanName = resolveAttribute(element, "name", parserContext);
            if (StringUtils.isEmpty(generatedBeanName)) {
                if (ProtocolConfig.class.equals(beanClass)) {
                    generatedBeanName = "dubbo";
                } else {
                    generatedBeanName = resolveAttribute(element, "interface", parserContext);
                }
            }
            if (StringUtils.isEmpty(generatedBeanName)) {
                generatedBeanName = beanClass.getName();
            }
            id = generatedBeanName;
            int counter = 2;
            while (parserContext.getRegistry().containsBeanDeFinition(id)) {
                id = generatedBeanName + (counter++);
            }
        }
        if (StringUtils.isNotEmpty(id)) {
            if (parserContext.getRegistry().containsBeanDeFinition(id)) {
                throw new IllegalStateException("Duplicate spring bean id " + id);
            }
            parserContext.getRegistry().registerBeanDeFinition(id, beanDeFinition);
            beanDeFinition.getPropertyValues().addPropertyValue("id", id);
        }
        if (ProtocolConfig.class.equals(beanClass)) {
            for (String name : parserContext.getRegistry().getBeanDeFinitionNames()) {
                BeanDeFinition deFinition = parserContext.getRegistry().getBeanDeFinition(name);
                PropertyValue property = deFinition.getPropertyValues().getPropertyValue("protocol");
                if (property != null) {
                    Object value = property.getValue();
                    if (value instanceof ProtocolConfig && id.equals(((ProtocolConfig) value).getName())) {
                        deFinition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(id));
                    }
                }
            }
        } else if (ServiceBean.class.equals(beanClass)) {
            String className = resolveAttribute(element, "class", parserContext);
            if (StringUtils.isNotEmpty(className)) {
                RootBeanDeFinition classDeFinition = new RootBeanDeFinition();
                classDeFinition.setBeanClass(ReflectUtils.forName(className));
                classDeFinition.setLazyInit(false);
                parseProperties(element.getChildNodes(), classDeFinition, parserContext);
                beanDeFinition.getPropertyValues().addPropertyValue("ref", new BeanDeFinitionHolder(classDeFinition, id + "Impl"));
            }
        } else if (ProviderConfig.class.equals(beanClass)) {
            parsenested(element, parserContext, ServiceBean.class, true, "service", "provider", id, beanDeFinition);
        } else if (ConsumerConfig.class.equals(beanClass)) {
            parsenested(element, parserContext, ReferenceBean.class, false, "reference", "consumer", id, beanDeFinition);
        }
        Set<String> props = new HashSet<>();
        ManagedMap parameters = null;
        for (Method setter : beanClass.getmethods()) {
            String name = setter.getName();
            if (name.length() > 3 && name.startsWith("set")
                    && Modifier.isPublic(setter.getModifiers())
                    && setter.getParameterTypes().length == 1) {
                Class<?> type = setter.getParameterTypes()[0];
                String beanProperty = name.substring(3, 4).toLowerCase() + name.substring(4);
                String property = StringUtils.camelToSplitName(beanProperty, "-");
                props.add(property);
                // check the setter/getter whether match
                Method getter = null;
                try {
                    getter = beanClass.getmethod("get" + name.substring(3), new Class<?>[0]);
                } catch (NoSuchMethodException e) {
                    try {
                        getter = beanClass.getmethod("is" + name.substring(3), new Class<?>[0]);
                    } catch (NoSuchMethodException e2) {
                        // ignore, there is no need any log here since some class implement the interface: EnvironmentAware,
                        // ApplicationAware, etc. They only have setter method, otherwise will cause the error log during application start up.
                    }
                }
                if (getter == null
                        || !Modifier.isPublic(getter.getModifiers())
                        || !type.equals(getter.getReturnType())) {
                    continue;
                }
                if ("parameters".equals(property)) {
                    parameters = parseParameters(element.getChildNodes(), beanDeFinition, parserContext);
                } else if ("methods".equals(property)) {
                    parseMethods(id, element.getChildNodes(), beanDeFinition, parserContext);
                } else if ("arguments".equals(property)) {
                    parseArguments(id, element.getChildNodes(), beanDeFinition, parserContext);
                } else {
                    String value = resolveAttribute(element, property, parserContext);
                    if (value != null) {
                        value = value.trim();
                        if (value.length() > 0) {
                            if ("registry".equals(property) && RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(value)) {
                                RegistryConfig registryConfig = new RegistryConfig();
                                registryConfig.setAddress(RegistryConfig.NO_AVAILABLE);
                                beanDeFinition.getPropertyValues().addPropertyValue(beanProperty, registryConfig);
                            } else if ("provider".equals(property) || "registry".equals(property) || ("protocol".equals(property) && AbstractServiceConfig.class.isAssignableFrom(beanClass))) {
                                /**
                                 * For 'provider' 'protocol' 'registry', keep literal value (should be id/name) and set the value to 'registryIds' 'providerIds' protocolIds'
                                 * The following process should make sure each id refers to the corresponding instance, here's how to find the instance for different use cases:
                                 * 1. Spring, check existing bean by id, see{@link ServiceBean#afterPropertiesSet()}; then try to use id to find configs defined in remote Config Center
                                 * 2. API, directly use id to find configs defined in remote Config Center; if all config instances are defined locally, please use {@link ServiceConfig#setRegistries(List)}
                                 */
                                beanDeFinition.getPropertyValues().addPropertyValue(beanProperty + "Ids", value);
                            } else {
                                Object reference;
                                if (isPrimitive(type)) {
                                    if ("async".equals(property) && "false".equals(value)
                                            || "timeout".equals(property) && "0".equals(value)
                                            || "delay".equals(property) && "0".equals(value)
                                            || "version".equals(property) && "0.0.0".equals(value)
                                            || "stat".equals(property) && "-1".equals(value)
                                            || "reliable".equals(property) && "false".equals(value)) {
                                        // backward compatibility for the default value in old version's xsd
                                        value = null;
                                    }
                                    reference = value;
                                } else if (ONRETURN.equals(property) || ONTHROW.equals(property) || ONINVOKE.equals(property)) {
                                    int index = value.lastIndexOf(".");
                                    String ref = value.substring(0, index);
                                    String method = value.substring(index + 1);
                                    reference = new RuntimeBeanReference(ref);
                                    beanDeFinition.getPropertyValues().addPropertyValue(property + METHOD, method);
                                } else {
                                    if ("ref".equals(property) && parserContext.getRegistry().containsBeanDeFinition(value)) {
                                        BeanDeFinition refBean = parserContext.getRegistry().getBeanDeFinition(value);
                                        if (!refBean.isSingleton()) {
                                            throw new IllegalStateException("The exported service ref " + value + " must be singleton! Please set the " + value + " bean scope to singleton, eg: <bean id=\"" + value + "\" scope=\"singleton\" ...>");
                                        }
                                    }
                                    reference = new RuntimeBeanReference(value);
                                }
                                beanDeFinition.getPropertyValues().addPropertyValue(beanProperty, reference);
                            }
                        }
                    }
                }
            }
        }
        NamednodeMap attributes = element.getAttributes();
        int len = attributes.getLength();
        for (int i = 0; i < len; i++) {
            Node node = attributes.item(i);
            String name = node.getLocalName();
            if (!props.contains(name)) {
                if (parameters == null) {
                    parameters = new ManagedMap();
                }
                String value = node.getNodeValue();
                parameters.put(name, new TypedStringValue(value, String.class));
            }
        }
        if (parameters != null) {
            beanDeFinition.getPropertyValues().addPropertyValue("parameters", parameters);
        }
        return beanDeFinition;
    }

这里面的判断有兴趣的可以自行debug研究,再来看看Annotation是怎么实现的。

第二种:Annotation

通过 dubboComponentScan 注解,该注解spring的 @Import 注解导入了 dubboComponentScanRegistrar.class ,该注解实现于 ImportBeanDeFinitionRegistrar 接口,springboot里面很多地方都是通过该接口进行扩展的,这里不做深入研究。

服务发布

再来看下上面提到过的init()方法,我们知道服务发布是通过 @Service 注解(该注解在2.7.7版本已被废弃,通过dubboService 取代)实现的,会把我们 @Service 注解配置的类或者<dubbo:service>标签配置的类转为ServiceBean

在这里插入图片描述

可以看到该类实现了五个接口

  • InitializingBean (执行初始化方法
  • disposableBean (执行销毁方法
  • ApplicationContextAware获取ApplicationContext上下文)
  • ApplicationListener<ContextRefreshedEvent>(事件监听,2.7.7已去掉)
  • BeanNameAware获取当前的beanName)
  • ApplicationEventPublisherAware (发布事件)

这五个接口都是spring的

在这里插入图片描述

在spring上下文加载的时候触发 ApplicationListener 接口的onApplicationEvent()方法监听,首先判断该服务是否发布过了,没有发布的话就调用export()方法

 @Override
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        export();
    }
}

@Override
public void export() {
    // 这里调用父类
    super.export();
    // Publish ServiceBeanExportedEvent
    publishExportEvent();
}

再来看父类 ServiceConfig 的export()方法

public synchronized void export() {
    // 检查或更新配置,比如完善认配置:端口号、协议
    checkAndUpdateSubConfigs();
    // 是否要发布服务,可通过@Service注解的export属性设置为fasle,就是不把该接口暴露出去,认值为true
    if (!shouldExport()) {
        return;
    }

    // 是否延时发布,可通过@Service注解的delay属性设置,这里提供延时加载的功能可能是考虑到spring容器还没初始化就加载该类会出错
    if (shouldDelay()) {
        delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        doExport();
    }
}

在通过doExport()方法调用doExportUrls(),首先加载我们配置的注册中心地址,通过URL来驱动

registry://ip:2181/org.apache.dubbo.registry.RegistryService/…

private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getcontextpath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        ApplicationModel.initProviderModel(pathKey, providerModel);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

registries就是我们配置的注册中心的地址

在这里插入图片描述


然后会拼接上我们配置的dubbo应用名称、协议名、端口号等一系列参数。

在这里插入图片描述


然后判断如果是服务提供者并且当前地址里面的registry参数不为空的话就把当前地址添加到registryList注册中心集合里面。

在这里插入图片描述


遍历完再回到doExportUrls()方法里面,接着去遍历我们配置的协议规则,目前我们只配置dubbo这一种协议,然后去构建我们的服务接口

在这里插入图片描述


进入到doExportUrlsFor1Protocol()方法,先把之前构建好的参数封装到map里面,这个map里面最多可以有三十多个参数,因为有些没有配置,这里只显示了20个

在这里插入图片描述


然后通过把map变为一个URL,这个url就非常熟悉了,就是我们最终存放在zk的地址

dubbo://192.168.124.7:20880/com.tqz.dubbo.api.ISayHelloService?anyhost=true&application=springboot-dubbo-provider&bean.name=ServiceBean:com.tqz.dubbo.api.ISayHelloService&bind.ip=192.168.124.7&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.tqz.dubbo.api.ISayHelloService&methods=sayHello&pid=73828&qos-enable=false&register=true&release=2.7.2&side=provider&timeout=50000&timestamp=1616175325378

在这里插入图片描述


这时候url已经组装好了,接下来就要进行服务的发布了。还是在doExportUrlsFor1Protocol()方法中,首先获取一个服务发布的范围,这里一共分为两种,一种是在同一个jvm里面调用,没必要走远程通信会通过injvm://ip:port方式进行调用,另外一种是remote,也就是上面我们组装的url进行远程调用。如果我们配置了注册中心的地址,认两种都会进行发布。这里我们假设是远程调用,遍历registryURLs,也就是我们之前的registry://ip:2181/org.apache.dubbo.registry.RegistryService/…

 String scope = url.getParameter(ScopE_KEY);
        // don't export when none is configured
        if (!ScopE_NONE.equalsIgnoreCase(scope)) {

            // 如果是本地发布
            if (!ScopE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // 如果是远程调用
            if (!ScopE_LOCAL.equalsIgnoreCase(scope)) {
                if (!isOnlyInJvm() && logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        // registryURL等于:registry://ip:2181/org.apache.dubbo.registry.RegistryService/...
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterandEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        ......
                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterandEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
						
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService MetadataReportService = null;
                if ((MetadataReportService = getMetadataReportService()) != null) {
                    MetadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);

上面的先不用关心,直接看

Exporter<?> exporter = protocol.export(wrapperInvoker)这行,这时候的protocol是成员属性

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

自适应扩展点,之前分析过 Protocol 接口的export()方法和refer()方法都有 @Adaptive 注解,会根据代理生成字节码Protocol$Adaptive,回过头来再看下这个类。在我们上面的registryURLs是registry://ip:2181/org.apache.dubbo.registry.RegistryService/…,所以生成字节码类里面的export()方法里面的extName一定是registry

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {

    public void destroy() {
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        // extName等于:registry
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public org.apache.dubbo.rpc.Invoker refer(java.lang.class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

在去dubbo的meta-inf里面看下org.apache.dubbo.rpc.Protocol文件,在里面可以找到一个key为registry的,所以exprot()方法里面最后的extension应该等于 RegistryProtocol

在这里插入图片描述


进入到 RegistryProtocol 的export()方法,首先获取注册中心的地址和服务提供者的地址

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    // 服务更改进行重新,比如在控制台进行了更改
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    // 启动一个Netty服务
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,                                              registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

接下来我们看看是怎么启动的,在doLocalExport()方法里,bounds就是一个 ConcurrentHashMap,这里使用了jdk1.8特性,判断当前的key是否存在,就如不存在就put到该map中,然后通过protocol.export()方法暴露出去。这里的protocol会经过层层的包装,

  • QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(dubboProtocol)))

最终也就是我们配置的dubbo协议

    @SuppressWarnings("unchecked")
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
		
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

方法前面的都不用关心,直接看最后两行,首先开启一个服务,暴露我们配置的20880协议,然后优化序列化

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        dubboExporter<T> exporter = new dubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isstubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isstubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

在openServer里面会调用createServer()方法认开启server关闭时发送readonly事件,以及心跳机制事件,DEFAULT_REMOTING_SERVER的认值是netty,还有其他的例如:mina、grizzy。

private ExchangeServer createServer(URL url) {
    url = URLBuilder.from(url)
            // send readonly event when server closes, it's enabled by default
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // enable heartbeat by default
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            .addParameter(CODEC_KEY, dubboCodec.NAME)
            .build();
    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    str = url.getParameter(CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }

    return server;
}

然后通过 Exchangers.bind(url, requestHandler) ->

org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind ->

org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...) ->

org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind

进入到该方法,会初始化 NettyServer

@Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
  public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

调用父类的构造方法,然后调用doopen()方法,该方法为抽象方法,由刚刚的实现类 NettyServer 去实现

  public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        ......
        try {
            doopen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
      ......
    }

由刚刚的实现类 NettyServer 去实现,到这里就把服务给暴露出去了。

@Override
    protected void doopen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.so_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }
回过头来看看服务发布做了哪些事情
  • 基于spirng进行解析配置文件存储到config
  • 各种判断逻辑,保证配置信息安全性
  • 组装url:registry:// -> zookeeper:// -> dubbo:// -> injvm
  • 构建一个Invoker(代理)
  • RegistryProtocol.export()
  • 各种包装(qos/filter/lisenter)
  • dubboProtocol.export()发布服务
  • 启动一个NettyServer#doopen()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐