直接进入服务导出入口 ServiceConfig#export 方法
//服务导出的入口 public synchronized void export() { if (!shouldExport()) { return; } //获取单例的 dubboBootstrap 并初始化 dubboBootstrap //这一步 boostrap 不是null, 因为 bootstrap 已经在 dubboBootstrapApplicationListener.onContextRefreshedEvent 方法中启动了 if (bootstrap == null) { bootstrap = dubboBootstrap.getInstance(); bootstrap.initialize(); } //检查并更新配置信息 checkAndUpdateSubConfigs(); //init serviceMetadata serviceMetadata.setVersion(getVersion()); serviceMetadata.setGroup(getGroup()); serviceMetadata.setDefaultGroup(getGroup()); serviceMetadata.setServiceType(getInterfaceClass()); serviceMetadata.setServiceInterfaceName(getInterface()); serviceMetadata.setTarget(getRef()); //如果需要延时导出,导出服务加入定时任务,否则立即导出 if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { doExport(); } exported(); }
ServiceConfig#export 方法主要就是校验,配置服务元数据信息,执行真正的导入方法 ServiceConfig#exported,最后调用导入成功之后的方法 ServiceConfig#exported
/** * 执行服务导入,由 {@link #export()} 调用 */ protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } //检查是否需要导出,如果不需要导出,则返回, // 极少情况下是不需要导出的,如本地debug if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } //从需要导出的 URL 列表中导出服务 doExportUrls(); }
这一步接近一个空壳方法,还是要调用 ServiceConfig#doExportUrls 方法从需要导出的 URL 列表中导出服务
进入 ServiceConfig#doExportUrls 方法
/** * 从需要导出的 URL 列表中导出服务,由 {@link #doExport}调用 */ @SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { //向 repository 注册服务和服务提供者,服务提供者被包装成了一个 ProviderModel 模型 ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass()); repository.registerProvider( getUniqueServiceName(), ref, serviceDescriptor, this, serviceMetadata ); //加载所有注册中心链接,这里是 dubbo 支持多协议多注册中心导出服务的关键 List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true); for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getcontextpath(protocolConfig) .map(p -> p + "/" + path) .orElse(path), group, version); //如果用户明确指定了路径,注册服务到其指定的路径上 // In case user specified path, register service one more time to map it to path. repository.registerService(pathKey, interfaceClass); // Todo, uncomment this line once service key is unified serviceMetadata.setServiceKey(pathKey); //根据协议导出服务 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
这一步是要导出URL 列表中的所有服务,它先向 repository 注册服务和服务提供者,服务提供者被包装成了一个 ProviderModel 模型,然后调用 ConfigValidationUtils#loadRegistries 方法加载所有注册中心连接,这一步是 dubbo 支持多协议多注册中心导出服务的关键,最后迭代不同的协议,调用 ServiceConfig#doExportUrlsFor1Protocol 方法根据协议导出服务。
有关注册中心的模型看下图,主要包含了注册中心的URL字符串,协议类型,名称,密码,IP地址,端口等。
进入 ConfigValidationUtils#loadRegistries 方法,看 dubbo 是如何加载注册中心
public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) { // check && override if necessary //检查并覆盖 URL中的一些属性 List<URL> registryList = new ArrayList<URL>(); //获取<dubbo:application .../>信息 ApplicationConfig application = interfaceConfig.getApplication(); //获取<dubbo:registry .../>信息 List<RegistryConfig> registries = interfaceConfig.getRegistries(); if (CollectionUtils.isNotEmpty(registries)) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (StringUtils.isEmpty(address)) { // 若 address 为空,则将其设为 0.0.0.0 address = ANYHOST_VALUE; } if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { //定义一个保存属性的map 并将 application, config, AbstractInterfaceConfig 中的一些属性加入 map Map<String, String> map = new HashMap<String, String>(); AbstractConfig.appendParameters(map, application); AbstractConfig.appendParameters(map, config); map.put(PATH_KEY, RegistryService.class.getName()); AbstractInterfaceConfig.appendRuntimeParameters(map); // 如果 protocol 为空,默认使用 dubbo 协议 if (!map.containsKey(PROTOCOL_KEY)) { map.put(PROTOCOL_KEY, dubBO_PROTOCOL); } // 通过address 和 map 解析并生成 registry URL 列表 List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = URLBuilder.from(url) .addParameter(REGISTRY_KEY, url.getProtocol()) .setProtocol(extractRegistryType(url)) //// 将 URL 协议头设置为 registry 或 service-discovery-registry .build(); // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下: // (服务提供者 && register = true 或 null) // || (非服务提供者 && subscribe = true 或 null) if ((provider && url.getParameter(REGISTER_KEY, true)) || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
进入 ServiceConfig#doExportUrlsFor1Protocol 方法
/** * 不同的协议导出的 URL 不同 */ private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); // 如果协议名为空,或空串,则将协议名变量设置为 dubbo if (StringUtils.isEmpty(name)) { name = dubBO; } //将所有导出服务的信息添加到 map 中 Map<String, String> map = new HashMap<String, String>(); //side 为 provider, 表明导出的是服务的生产者 map.put(SIDE_KEY, PROVIDER_SIDE); // 添加版本、时间戳以及进程号等信息到 map 中 ServiceConfig.appendRuntimeParameters(map); AbstractConfig.appendParameters(map, getMetrics()); // 通过反射将对象的字段信息添加到 map 中 AbstractConfig.appendParameters(map, getApplication()); AbstractConfig.appendParameters(map, getModule()); // remove 'default.' prefix for configs from ProviderConfig // appendParameters(map, provider, Constants.DEFAULT_KEY); AbstractConfig.appendParameters(map, provider); AbstractConfig.appendParameters(map, protocolConfig); AbstractConfig.appendParameters(map, this); MetadataReportConfig MetadataReportConfig = getMetadataReportConfig(); if (MetadataReportConfig != null && MetadataReportConfig.isValid()) { map.putIfAbsent(MetaDATA_KEY, REMOTE_MetaDATA_STORAGE_TYPE); } //添加method 信息,getmethods() 返回 MethodConfig 列表, MethodConfig 中存储了 <dubbo:method> 标签的配置信息 if (CollectionUtils.isNotEmpty(getmethods())) { for (MethodConfig method : getmethods()) { // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。 // 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig, // 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} AbstractConfig.appendParameters(map, method, method.getName()); // 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0 String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } // 获取 ArgumentConfig 列表 List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { for (ArgumentConfig argument : arguments) { // convert argument type // 检测 type 属性是否为空,或者空串(分支1 ⭐️) if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getmethods(); // visit all methods if (methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } // 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息 if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } // 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等 // 获取接口类的所有方法名称 String[] methods = Wrapper.getWrapper(interfaceClass).getmethodNames(); // 添加方法名到 map 中,如果包含多个方法名,则用逗号隔开,比如 methods = init,destroy if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { // 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中 map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } /** * Here the token value configured by the provider is used to assign the value to ServiceConfig#token */ // 先用 token 属性保存 provider 的 token if(ConfigUtils.isEmpty(token) && provider != null) { token = provider.getToken(); } // 如果token 不为空 token,根据 token 是否为 “true” 或者 “default” 来判断是生成随机 token 还是将现有 token 加入 map 中 if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } //init serviceMetadata attachments serviceMetadata.getAttachments().putAll(map); // export service //服务导出 //生成导出服务的 URL, 协议://ip:端口号/服务接口名称?参数列表 String host = findConfigedHosts(protocolConfig, registryURLs, map); //获取服务 IP 地址 Integer port = findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, getcontextpath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); // You can customize Configurator to append extra parameters if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { // 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(ScopE_KEY); // scope 为 none 的时候不导出 // don't export when none is configured if (!ScopE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) // scope != remote ,导出到本地 if (!ScopE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) // scope != local,导出到远程 if (!ScopE_LOCAL.equalsIgnoreCase(scope)) { if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register // 如果 协议为 injvm, 不用注册协议 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // 加载监视器链接 URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL); if (monitorUrl != null) { // 将监视器链接作为参数添加到 url 中 url = url.addParameterandEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true)) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } else { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } } // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // 为服务提供类(ref)生成 Invoker Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterandEncoded(EXPORT_KEY, url.toFullString())); // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 导出服务,并生成 Exporter /** * 这里调用的是 {@link RegistryProtocol#export(org.apache.dubbo.rpc.Invoker) */ Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); } // 不存在注册中心,仅导出服务 } else { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ WritableMetadataService MetadataService = WritableMetadataService.getExtension(url.getParameter(MetaDATA_KEY, DEFAULT_MetaDATA_STORAGE_TYPE)); if (MetadataService != null) { MetadataService.publishServiceDeFinition(url); } } } this.urls.add(url); }
ServiceConfig#doExportUrlsFor1Protocol 方法的作用主要是根据协议 protocolConfig 来导出服务,它主要执行了以下几步:
- STEP1. 将所有导出服务的属性添加到 map 中;
- STEP2. 解析MethodConfig,MethodConfig 就是 <dubbo:method> 标签;
- STEP3:获取要导出类的所有方法名称,将这些方法名称用 “,” 分隔保存在 map 中;
- STEP4:添加 token 信息、添加所有 serviceMetadata 的 attachments 属性到 map 中;
- STEP5:前几步都是导出的准备工作,主要是用 map 收集所有导出属性,从这一步才开始执行真正的导出工作。现获取当前服务的 host 和 port。然后用协议名称 name、主机地址host、端口 port、导出服务的全类名 path 和 收集到的 map 属性 生成 URL。生成的 URL 格式为 协议://ip:端口号/服务接口名称?参数列表,如 name = dubbo,host=192.168.11.11,port=20880,path = org.apache.dubbo.demo.DemoService,map={"side":"provider","methods":"sayHello,sayHelloAsync",...},那么生成的 URL 格式为:dubbo://192.168.11.11:20880/org.apache.dubbo.demo.DemoService?methods=sayHello,sayHelloAsync&side=provider;
- STEP6:获取 url 中的 scop 参数,如果 scop != none,代表要执行导出逻辑,先判断如果 scope != remote,调用 ServiceConfig#exportLocal 方法导出到本地,再判断如果 scope != local,代表要导出到远程,进一步再判断注册中心 registryURLs 是否为空,如果不为空,将服务注册到注册中心,如果 protocol == injvm,直接跳过不用注册服务,否则,先加载监视器 url,并将监视器 url作为参数添加到导出服务 url 中,然后将导出服务url 作为参数加到注册中心 url 中, 再根据真正的服务提供类 ref、服务接口 interfaceClass、服务注册中心 registryURL 调用 ProxyFactory#getInvoker 方法生成一个 invoker,dubbo 默认是用 Javassist 库作为字节码生成工具的,所有这里默认的也就是 JavassistProxyFactory#getInvoker 方法,用 DelegateProviderMetaDataInvoker 类包装 invoker 和 serviceConfig 信息生成 wrapperInvoker,最后调用 Protocol#export 方法生成导出服务对象 exporter,注册中心导出服务的方法一般是 RegistryProtocol#export,最后添加 exporter 到缓存 exporters 中。但是如果注册中心 registryURLs 为空,代表不存在注册中心,仅调用 Protocol#export 方法导出服务即可;
- STEP7:将导出服务 url 添加到 urls 缓存中
进入 ServiceConfig#exportLocal 方法,看看服务是如何导出到本地
/** * 服务导出到本地,就是导出到 JVM */ /** * always export injvm */ private void exportLocal(URL url) { URL local = URLBuilder.from(url) .setProtocol(LOCAL_PROTOCOL) .setHost(LOCALHOST_VALUE) .setPort(0) .build(); Exporter<?> exporter = PROTOCOL.export( PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local); }
ServiceConfig#exportLocal 方法很简单,就是用原 url 的值,替换 protocol=injvm,host=127.0.0.1,port=0 生成一个新的 url, 并调用 Protocol#export 方法导出服务即可
进入 JavassistProxyFactory#getInvoker 方法
@Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // Todo Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
JavassistProxyFactory#getInvoker 方法会先根据参数调用 Wrapper#getWrapper 方法生成一个 wrapper,再返回一个 AbstractProxyInvoker 匿名类对象。
进入 Wrapper#getWrapper 方法
/** * get wrapper. * * @param c Class instance. * @return Wrapper instance(not null). */ public static Wrapper getWrapper(Class<?> c) { while (ClassGenerator.isDynamicclass(c)) // can not wrapper on dynamic class. { c = c.getSuperclass(); } if (c == Object.class) { return OBJECT_WRAPPER; } return WRAPPER_MAP.computeIfAbsent(c, key -> makeWrapper(key)); }
Wrapper#getWrapper 方法会调用 Wrapper#makeWrapper 方法创建一个 wrapper,加入 WRAPPER_MAP 并返回。
进入 Wrapper#makeWrapper 方法
Todo
进入 RegistryProtocol#export 方法,看看是如何注册和导出的
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下: // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider URL registryUrl = getRegistryUrl(originInvoker); // url to export locally // 获取已注册的服务提供者 URL,比如: // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. // 获取订阅 URL,比如: // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); // 创建监听器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker //导出服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); //向注册中心注册服务 if (register) { register(registryUrl, registeredProviderUrl); } // register stated url on provider model registerStatedUrl(registryUrl, registeredProviderUrl, register); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //通知服务导出的监听 notifyExport(exporter); // 创建并返回 DestroyableExporter //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
RegistryProtocol#export 方法主要有两件事,导出服务和注册服务
- STEP1:前期准备:先获取支持中心URL,比如:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
,再获取已注册服务的url ,比如:dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,再获取订阅 URL,比如:provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,根据 订阅url 和 originInvoker 生成监听器 OverrideListener 并加入 overrideListeners 缓存,根据 overrideSubscribeListener 覆盖原 providerUrl 中的一些信息。 - STEP2:调用 RegistryProtocol#doLocalExport 方法导出服务
- STEP3:调用 RegistryProtocol#register 方法向注册中心注册服务
- STEP4:调用 RegistryProtocol#notifyExport 方法通知服务导出监听
- STEP5:创建并返回 DestroyableExporter
进入 RegistryProtocol#doLocalExport 方法
/** * 导出服务, 调用 protocol#export() 导出,默认 export 为 {@link org.apache.dubbo.rpc.protocol.dubbo.dubboProtocol#export(org.apache.dubbo.rpc.Invoker)} * @param originInvoker * @param providerUrl * @param <T> * @return */ @SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }
这个方法很简单,就是调用 Protocol#export 创建 Exporter, 包装成 ExporterChangeableWrapper 加入 bounds 缓存并返回,dubbo 默认使用的protocol 为 dubboProtocol,所以这里默认调用的是 dubboProtocol#export 方法
进入 dubboProtocol#export 方法,看看dubbo协议是如何导出服务的
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如: // demogroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880 // export service. String key = serviceKey(url); // 创建 dubboExporter dubboExporter<T> exporter = new dubboExporter<T>(invoker, key, exporterMap); // 将 <key, exporter> 键值对放入缓存中 exporterMap.put(key, exporter); // 本地存根相关代码 //export an stub service for dispatching event Boolean isstubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isstubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } } // 启动或刷新服务器 openServer(url); // 优化序列化 optimizeSerialization(url); return exporter; }
dubboProtocol#export 方法会先获取服务标识 key,服务标识由服务组名,服务名,服务版本号以及端口组成。比如:demogroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880,然后创建 dubboExporter对象,加入 exporterMap 缓存,然后调用 openServer(URL url) 启动服务,调用optimizeSerialization(url) 优化序列化,最后返回 exporter。
进入 openServer(URL url) 方法
private void openServer(URL url) { // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例 // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { //双重检查锁创建服务器并加入缓存 ProtocolServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { //创建服务器并加入缓存 serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } }
openServer(URL url) 方法会先判断 server 是否为空,如果为空,DCL 双重检查锁调用 createServer(url) 方法创建一个 server 加入缓存并返回,如果不为空,reset 重置。
进入 createServer(url) 方法
/** * 创建服务器 */ private ProtocolServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default //服务关闭时发送 readonly 事件 .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default //添加心跳检测 .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) // 添加编码解码器参数 .addParameter(CODEC_KEY, dubboCodec.NAME) .build(); // 获取 server 参数,默认为 netty String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } ExchangeServer server; try { // 创建 ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } // 获取 client 参数,可指定 netty,mina str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina] Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // 检测当前 dubbo 所支持的 Transporter 实现类名称列表中, // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常 if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return new dubboProtocolServer(server); }
createServer(url) 方法会先根据原 url 添加一些新的参数如readonly 事件,心跳检测,编解码生成一个新的 url,再获取 url 中的 server 参数,默认为 netty,通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,如果不存在,抛出异常,再调用 Exchangers#bind(URL, ExchangeHandler) 方法绑定url 和exchangeHandler 并返回一个 ExchangeServer,再通过 SPI 检测 url 中的 client 参数列表中是否都有支持的 Transporter,如果有一个不支持,抛出异常,最后生成一个 dubboProtocolServer 并返回
进入 Exchangers#bind(URL, ExchangeHandler) 方法
Todo
进入 RegistryProtocol#register 方法查看服务注册逻辑
private void register(URL registryUrl, URL registeredProviderUrl) { // 获取 Registry 并注册服务 Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); }
一个空壳方法,实际调用的是 RegistryService#register 方法,这里默认调用到的方法是 ListenerRegistryWrapper.register 方法
进入 ListenerRegistryWrapper.register 方法
@Override public void register(URL url) { try { registry.register(url); } finally { if (CollectionUtils.isNotEmpty(listeners)) { RuntimeException exception = null; for (RegistryServiceListener listener : listeners) { if (listener != null) { try { listener.onRegister(url); } catch (RuntimeException t) { logger.error(t.getMessage(), t); exception = t; } } } if (exception != null) { throw exception; } } } }
ListenerRegistryWrapper.register 方法是一个 RegistryService 的装饰类,它内部保存了一个真正的 registry,比如注册中心使用zookeeper,那么就真正调用的是 ZookeeperRegistry#register 方法,并在注册完之后调用 RegistryServiceListener#onRegister 方法通知所有监听器
至此服务注册和导出完成
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。