dubbo源码解析-服务暴露
服务发布-原理
第一个发布的动作:暴露本地服务
Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 127.0.0.1
第二个发布动作:暴露远程服务
Export dubbo service com.alibaba.dubbo.demo.DemoService to url dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
Register dubbo service com.alibaba.dubbo.demo.DemoService url dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&monitor=dubbo%3A%2F%2F192.168.48.117%3A2181%2Fcom.alibaba.dubbo.registry.RegistryService%3Fapplication%3Ddemo-provider%26backup%3D192.168.48.120%3A2181%2C192.168.48.123%3A2181%26dubbo%3D2.0.0%26owner%3Dwilliam%26pid%3D8484%26protocol%3Dregistry%26refer%3Ddubbo%253D2.0.0%2526interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526pid%253D8484%2526timestamp%253D1473908495729%26registry%3Dzookeeper%26timestamp%3D1473908495398&owner=william&pid=8484&side=provider×tamp=1473908495465 to registry registry://192.168.48.117:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&backup=192.168.48.120:2181,192.168.48.123:2181&dubbo=2.0.0&owner=william&pid=8484®istry=zookeeper×tamp=1473908495398, dubbo version: 2.0.0, current host: 127.0.0.1
第三个发布动作:启动netty
Start NettyServer bind /0.0.0.0:20880, export /192.168.100.38:20880, dubbo version: 2.0.0, current host: 127.0.0.1
第四个发布动作:打开连接zk
INFO zookeeper.ClientCnxn: opening socket connection to server /192.168.48.117:2181
第五个发布动作:到zk注册
Register: dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
第六个发布动作:监听zk
Subscribe: provider://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
Notify urls for subscribe url provider://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465, urls: [empty://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465], dubbo version: 2.0.0, current host: 127.0.0.1
暴露本地服务和暴露远程服务的区别是什么?
1.暴露本地服务:指暴露在用一个JVM里面,不用通过调用zk来进行远程通信。例如:在同一个服务,自己调用自己的接口,就没必要进行网络IP连接来通信。
2.暴露远程服务:指暴露给远程客户端的IP和端口号,通过网络来实现通信。
zk持久化节点 和临时节点有什么区别?
持久化节点:一旦被创建,触发主动删除掉,否则就一直存储在ZK里面。
临时节点:与客户端会话绑定,一旦客户端会话失效,这个客户端端所创建的所有临时节点都会被删除。
先给个完整发服务暴露原理的过程
ServiceBean.onApplicationEvent
-->export()
-->ServiceConfig.export()
-->doExport()
-->doExportUrls()//里面有一个for循环,代表了一个服务可以有多个通信协议,例如 tcp协议 http协议,默认是tcp协议
-->loadRegistries(true)//从dubbo.properties里面组装registry的url信息
-->doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
-->exportLocal(URL url)
-->proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
-->
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
-->extension.getInvoker(arg0, arg1, arg2)
-->StubProxyFactoryWrapper.getInvoker(T proxy, Class<T> type, URL url)
-->proxyFactory.getInvoker(proxy, type, url)
-->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
-->Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl)
-->makeWrapper(Class<?> c)
-->return new AbstractProxyInvoker<T>(proxy, type, url)
-->protocol.export
->ProtocolFilterWrapper
-->Protocol$Adpative.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("injvm");
-->extension.export(arg0)
-->ProtocolFilterWrapper.export
-->buildInvokerChain //创建8个filter
-->ProtocolListenerWrapper.export
-->InjvmProtocol.export
-->return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap)
-->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
-->proxyFactory.getInvoker//原理和本地暴露一样都是为了获取一个Invoker对象
-->protocol.export(invoker)
-->Protocol$Adpative.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
-->extension.export(arg0)
-->ProtocolFilterWrapper.export
-->ProtocolListenerWrapper.export
-->RegistryProtocol.export
-->doLocalExport(originInvoker)
-->getCacheKey(originInvoker);//读取 dubbo://192.168.100.51:20880/
-->rotocol.export
-->Protocol$Adpative.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("dubbo");
-->extension.export(arg0)
-->ProtocolFilterWrapper.export
-->buildInvokerChain//创建8个filter
-->ProtocolListenerWrapper.export
---------1.netty服务暴露的开始------- -->dubboProtocol.export
-->serviceKey(url)//组装key=com.alibaba.dubbo.demo.DemoService:20880
-->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService:20880, this=dubboExporter
-->openServer(url)
-->createServer(url)
--------2.信息交换层 exchanger 开始-------------->Exchangers.bind(url, requestHandler)//exchaanger是一个信息交换层
-->getExchanger(url)
-->getExchanger(type)
-->ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
-->HeaderExchanger.bind
-->Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
-->new HeaderExchangeHandler(handler)//this.handler = handler
-->new DecodeHandler
-->new AbstractChannelHandlerDelegate//this.handler = handler;
---------3.网络传输层 transporter--------------------->Transporters.bind
-->getTransporter()
-->ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
-->Transporter$Adpative.bind
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty");
-->extension.bind(arg0, arg1)
-->NettyTransporter.bind
--new NettyServer(url, listener)
-->AbstractPeer //this.url = url; this.handler = handler;
-->AbstractEndpoint//codec timeout=1000 connectTimeout=3000
-->AbstractServer //bindAddress accepts=0 idleTimeout=600000
---------4.打开断开,暴露netty服务-------------------------------->doopen()
-->设置 NioServerSocketChannelFactory boss worker的线程池 线程个数为3
-->设置编解码 hander
-->bootstrap.bind(getBindAddress())
-->new HeaderExchangeServer
-->this.server=NettyServer
-->heartbeat=60000
-->heartbeatTimeout=180000
-->startHeatbeatTimer()//这是一个心跳定时器,采用了线程池,如果断开就心跳重连。
-->getRegistry(originInvoker)//zk 连接
-->registryFactory.getRegistry(registryUrl)
-->ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper");
-->extension.getRegistry(arg0)
-->AbstractRegistryFactory.getRegistry//创建一个注册中心,存储在REGISTRIES
-->createRegistry(url)
-->new ZookeeperRegistry(url, zookeeperTransporter)
-->AbstractRegistry
-->loadProperties()//目的:把C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
文件中的内容加载为properties
-->notify(url.getBackupUrls())//不做任何事
-->FailbackRegistry
-->retryExecutor.scheduleWithFixedDelay(new Runnable()//建立线程池,检测并连接注册中心,如果失败了就重连
-->ZookeeperRegistry
-->zookeeperTransporter.connect(url)
-->ZookeeperTransporter$Adpative.connect(url)
-->ExtensionLoader.getExtensionLoader(ZookeeperTransporter.class).getExtension("zkclient");
-->extension.connect(arg0)
-->ZkclientZookeeperTransporter.connect
-->new ZkclientZookeeperClient(url)
-->AbstractZookeeperClient
-->ZkclientZookeeperClient
-->new ZkClient(url.getBackupAddress());//连接ZK
-->client.subscribeStateChanges(new IZkStateListener()//订阅的目标:连接断开,重连
-->zkClient.addStateListener(new StateListener()
-->recover //连接失败 重连
-->registry.register(registedProviderUrl)//创建节点
-->AbstractRegistry.register
-->FailbackRegistry.register
-->doRegister(url)//向zk服务器端发送注册请求
-->ZookeeperRegistry.doRegister
-->zkClient.create
-->AbstractZookeeperClient.create//dubbo/com.alibaba.dubbo.demo.DemoService/providers/
dubbo%3A%2F%2F192.168.100.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26
application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3D
com.alibaba.dubbo.demo.DemoService%26loadbalance%3Droundrobin%26methods%3DsayHello%26owner%3
Dwilliam%26pid%3D2416%26side%3Dprovider%26timestamp%3D1474276306353
-->createEphemeral(path);//临时节点 dubbo%3A%2F%2F192.168.100.52%3A20880%2F.............
-->createPersistent(path);//持久化节点 dubbo/com.alibaba.dubbo.demo.DemoService/providers
-->registry.subscribe//订阅ZK
-->AbstractRegistry.subscribe
-->FailbackRegistry.subscribe
-->doSubscribe(url, listener)// 向服务器端发送订阅请求
-->ZookeeperRegistry.doSubscribe
-->new ChildListener()
-->实现了 childChanged
-->实现并执行 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
//A
-->zkClient.create(path, false);//第一步:先创建持久化节点/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
-->zkClient.addChildListener(path, zkListener)
-->AbstractZookeeperClient.addChildListener
//C
-->createTargetChildListener(path, listener)//第三步:收到订阅后的处理,交给FailbackRegistry.notify处理
-->ZkclientZookeeperClient.createTargetChildListener
-->new IZkChildListener()
-->实现了 handleChildChange //收到订阅后的处理
-->listener.childChanged(parentPath, currentChilds);
-->实现并执行ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
-->收到订阅后处理 FailbackRegistry.notify
//B
-->addTargetChildListener(path, targetListener)////第二步
-->ZkclientZookeeperClient.addTargetChildListener
-->client.subscribeChildChanges(path, listener)//第二步:启动加入订阅/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
-->notify(url, listener, urls)
-->FailbackRegistry.notify
-->doNotify(url, listener, urls);
-->AbstractRegistry.notify
-->saveProperties(url);//把服务端的注册url信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
-->registryCacheExecutor.execute(new SaveProperties(version));//采用线程池来处理
-->listener.notify(categoryList)
-->RegistryProtocol.notify
-->RegistryProtocol.this.getProviderUrl(originInvoker)//通过invoker的url 获取 providerUrl的地址
现在开始细化其中的步骤
ServiceBean.onApplicationEvent
ServiceBean.onApplicationEvent 方法
ServiceBean.afterPropertiesSet 方法
AnnotationBean.postProcessAfterInitialization 方法
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
// 服务暴露
export();
}
}
-->export() 方法
public synchronized void export() {
// 暴露提供的服务
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
// 是否需要延时暴露
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
// 暴露服务
doExport();
}
}
// 校验<dubbo:application name="Trade" owner="Trade" organization="cpcn" version="v1.0.0" />没有配置抛出异常
checkApplication();
//校验<dubbo:registry file="${dubbo.registry.file}" address="${dubbo.registry.address}" />没有配置抛出异常
checkRegistry();
//如果没有配置protocol会set(new ProtocolConfig())
checkProtocol();
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 做一些
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
看ServiceConfig.doExportUrls();
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
//代表了一个服务可以有多个通信协议,例如 tcp协议 http协议,默认是tcp协议
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
再看ServiceConfig.doExportUrlsFor1Protocol()
本地服务暴露
远程暴露
RegistryProtocol.export()
这个方法虽然是叫doLocalExport() 其实里面是走了一个远程暴露的方法
上面的protocol.export()方法中进入dubboProtocol 的export方法
dubboProtocol.export()
启动netty
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
注意看这个requestHandler 这个类的处理
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
服务端接收请求的后的处理 看reply方法
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsstr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsstr == null || methodsstr.indexOf(",") == -1) {
hasMethod = inv.getmethodName().equals(methodsstr);
} else {
String[] methods = methodsstr.split(",");
for (String method : methods) {
if (inv.getmethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getmethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
继续看打开netty
暴露远程服务,启动netty 之后,注册zk
RegistProtocol.export()
大家可能知道有个 本地的 dubbo.registry.file=/root/.dubbo/Accountant-registry-172.31.1.187.cache
的配置, 那什么时候去加载这个配置呢?
getRegistry 这个里面是加锁的 ,createRegistry得到的是zookeeperRegistry
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 特别关注 super(url) 方法
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEParaTOR)) {
group = Constants.PATH_SEParaTOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
再次看里面的super(url)
public FailbackRegistry(URL url) {
super(url);
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at Failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
看这个
加载完配置信息后就会通知listener 的 前提是已经有listen已经在监听这个服务了,否则啥都不做。
咱们看看这个文件长啥样
加载的配置文件在哪用呢?
如果订阅异常了,会从配置文件里面读取服务信息,也就是说zookeeper挂了,依然可以调用服务
什么时候更新这个cache呢?
在订阅的时候,然后notify 就会去更新
重新了cache
/dubbo/cpcn.payment.shwealthbank401.rpc.ShwbAccountService/providers/dubbo%3A%2F%2F192.168.140.43%3A7101%2Fcpcn.payment.shwealthbank401.rpc.ShwbAccountService%3Faccepts%3D2000%26anyhost%3Dtrue%26application%3DSHWealthBank401%26application.version%3Dv1.0.0%26charset%3DUTF-8%26delay%3D-1%26dispatcher%3Dmessage%26dubbo%3D2.0.1%26executes%3D500%26generic%3Dfalse%26interface%3Dcpcn.payment.shwealthbank401.rpc.ShwbAccountService%26methods%3Dtx4711%2Ctx4710%2Ctx4706%2Ctx4705%2Ctx4708%2Ctx4707%2Ctx4702%2Ctx4701%2Ctx4712%2Ctx4704%2Ctx4703%2Ctx4709%26organization%3Dcpcn%26owner%3DSHWealthBank401%26pid%3D29716%26queues%3D0%26revision%3DSNAPSHOT%26serialization%3Dhessian2%26side%3Dprovider%26threadpool%3Dcached%26threads%3D500%26timeout%3D15000%26timestamp%3D1546070995869%26transporter%3Dnetty%26version%3D1.0.0
注册信息是带版本号的
注册ZK之后监听服务
底层还是走的zookeeper
configurator主要记录服务的一些配置信息
总结来讲就是将 invoker 转换成exporter
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。