Dubbo 服务引用之构建客户端源码解析
准备工作:
启动一个提供者provider:
dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18594&side=provider×tamp=1551525684453
看一下ReferenceBean
的继承实现关系图:
在执行DemoService demoService = (DemoService) context.getBean("demoService")
时,由于ReferenceBean
是一个FactoryBean
,所以这里会通过FactoryBean.getObject
方法获取Bean
。
看一下ReferenceBean
的核心代码:
// ReferenceBean.getObject()方法,获取服务代理实例
public Object getObject() throws Exception {
return get();
}
// ReferenceConfig.get()方法,获取服务代理实例
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
// ReferenceConfig.init()方法,初始化
private void init() {
...
ref = createProxy(map); // 创建代理
}
// ReferenceConfig.createProxy()方法,创建服务代理
private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
// get(0) 解码后:
// registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=18793&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18793®ister.ip=172.16.132.166&side=consumer×tamp=1551526492998®istry=zookeeper×tamp=1551526495185
invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 服务引用 ###1
}
...
// 创建服务代理
return (T) proxyFactory.getProxy(invoker); // ###2
}
如上标注的1、2是最核心的两行代码。
一、使用Protocol将interfaceClass转化为Invoker
invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 服务引用 ###1
这里的refprotocol
是Protocol$Adaptive
实例。
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
...
public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
...
}
这里extName="registry"
。之后经过ProtocolListenerWrapper.refer->ProtocolFilterWrapper.refer->RegistryProtocol.refer
,前两步什么都不做(registry
协议)。来看RegistryProtocol.refer
方法核心代码:
// 引用远程服务
// url: registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=18793&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18793®ister.ip=172.16.132.166&side=consumer×tamp=1551526492998®istry=zookeeper×tamp=1551526495185
// type: interface com.alibaba.dubbo.demo.DemoService
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// url: zookeeper://....
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 连接ZK注册中心
Registry registry = registryFactory.getRegistry(url);
...
// 引用服务
return doRefer(cluster, registry, type, url);
}
参数:
- url:
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=18793&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18793®ister.ip=172.16.132.166&side=consumer×tamp=1551526492998®istry=zookeeper×tamp=1551526495185
- type:
interface com.alibaba.dubbo.demo.DemoService
第一行代码执行完成之后,替换了协议,此时的url
为:
zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=19009&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19009®ister.ip=172.16.132.166&side=consumer×tamp=1551527300192×tamp=1551527302398
之后开始获取Registry
。这里的registryFactory
是RegistryFactory$Adaptive
实例。
public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {
public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
// zookeeper
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
return extension.getRegistry(arg0);
}
}
这里的extName
是zookeeper
。之后执行ZookeeperRegistryFactory
的父类AbstractRegistryFactory.getRegistry
方法,如下:
// url 注册中心地址,不允许为空 zookeeper://....
@Override
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
// 注册中心url字符串表示
String key = url.toServiceString();
// 锁定注册中心获取过程,保证注册中心单一实例
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// ZookeeperRegistry
registry = createRegistry(url); // 创建ZookeeperRegistry实例
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 释放锁
LOCK.unlock();
}
}
经过处理的url
为:
zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&interface=com.alibaba.dubbo.registry.RegistryService&pid=19009×tamp=1551527302398
之后调用ZookeeperRegistryFactory.createRegistry(URL url)
:
// 连接Zookeeper注册中心
// url zookeeper://...
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
这里的zookeeperTransporter
为ZookeeperTransporter$Adaptive
实例。
// url 注册中心url zookeeper://....
// zookeeperTransporter 用于连接zk,ZookeeperTransporter$Adaptive实例
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// ZK根节点
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 在这里连接zk
zkClient = zookeeperTransporter.connect(url);
// 添加zk连接状态变化监听器
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover(); // 恢复注册与订阅
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
通过super(url)
这句代码,调用了ZookeeperRegistry
的父类FailbackRegistry
的构造器(启动失败重试定时任务:注册失败/注销失败/订阅失败/反订阅失败/通知失败)和AbstractRegistry
的构造器(将信息写入properties
文件,进行相应的通知——这里没有订阅url及通知监听器,所以没做什么事)。
然后连接Zookeeper
注册中心,获取ZookeeperClient
客户端(ZkclientZookeeperClient
实例),最后添加ZK连接状态变化监听器(在重新连接ZK成功后,恢复注册与订阅)。
执行zookeeperTransporter.connect(url)
,该方法中的extName
是zkclient
。之后执行ZkclientZookeeperTransporter.connect
:
// 使用Zkclient连接至zookeeper,返回zkclient zk客户端
// url:注册中心url zookeeper://....
// 返回ZkclientZookeeperClient
@Override
public ZookeeperClient connect(URL url) {
return new ZkclientZookeeperClient(url);
}
// 使用Zkclient连接至zookeeper
// url: 注册中心url zookeeper://...
ZkclientZookeeperClient(URL url) {
super(url);
// 创建连接zk的任务ListenableFutureTask,返回ZkClientWrapper实例
client = new ZkClientWrapper(url.getBackupAddress(), 30000);
// 给创建的ListenableFutureTask添加监听器,任务完成即在使用ZkClient连接zookeeper之后,添加zk连接状态变更监听器,监听连接断开/连接成功/重新连接成功事件
// (实际上这里只有重新连接成功事件会被处理,而处理器实际上就是ZookeeperRegistry构造器中的那个执行recover()的StateListener)
client.addListener(new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
@Override
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
client.start(); // 启动连接zk的任务,返回ZkClient客户端实例
}
此处的client
是ZkClientWrapper
实例,来看ZkClientWrapper.start()
:
private ListenableFutureTask<ZkClient> listenableFutureTask;
// 构造器——new ZkClientWrapper(url.getBackupAddress(), 30000)
// 创建连接zk的任务
// serverAddr zk集群地址
// timeout 连接超时设置
public ZkClientWrapper(final String serverAddr, long timeout) {
this.timeout = timeout; // 连接超时设置
// 创建连接zk的任务
listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
@Override
public ZkClient call() throws Exception {
// 连接zk
return new ZkClient(serverAddr, Integer.MAX_VALUE);
}
});
}
// 给创建的ListenableFutureTask添加监听器,任务完成即在ZkClient连接zookeeper之后,添加zk连接状态监听器
// listener zk连接状态监听器
public void addListener(final IZkStateListener listener) {
listenableFutureTask.addListener(new Runnable() {
@Override
public void run() {
try {
client = listenableFutureTask.get(); // 获取任务执行结果,即ZkClient实例
client.subscribeStateChanges(listener); // 添加zk连接状态变化监听器
} catch (InterruptedException e) {
logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!");
} catch (ExecutionException e) {
logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
}
}
});
}
// 连接zk,返回ZkClient实例
public void start() {
if (!started) {
// 另起线程
Thread connectThread = new Thread(listenableFutureTask);
connectThread.setName("DubboZkclientConnector");
connectThread.setDaemon(true);
connectThread.start();
try {
// 获取执行结果
client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
// 超时,抛出超时异常
logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
}
started = true;
} else {
logger.warn("Zkclient has already been started!");
}
}
此处会连接Zookeeper
,返回ZkClient
实例。
之后添加ZK连接状态变化监听器。到此为止,创建Registry
就完成了。再回到RegistryProtocol.refer
方法核心代码:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// url: zookeeper://....
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY))
.removeParameter(Constants.REGISTRY_KEY);
// 连接ZK注册中心
Registry registry = registryFactory.getRegistry(url);
...
// 引用服务
return doRefer(cluster, registry, type, url);
}
之后执行最后一行代码:
// cluster: Cluster@Adaptive;registry: ZookeeperRegistry;type: interface com.alibaba.dubbo.demo.DemoService
// url: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=19549&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549®ister.ip=172.16.132.166&side=consumer×tamp=1551529753690×tamp=1551529755848
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 注册目录
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry); // 设置注册中心
directory.setProtocol(protocol); // 设置协议
// REFER_KEY的所有属性
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 临时的url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&side=consumer×tamp=1551529753690
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 将服务消费者配置注册到服务注册中心
// 消费者注册的url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&side=consumer×tamp=1551529753690
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
}
// 消费者订阅url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers,cells&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&side=consumer×tamp=1551529753690
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// MockClusterInvoker实例
Invoker<T> invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
总体步骤:
- 首先创建
RegistryDirectory
实例; - 之后向ZK注册消费者
url
; - 然后订阅、监听(此处发生了第一次服务发现/长连接的建立/Netty客户端的建立);
- 最后将
RegistryDirectory
实例伪装成一个MockClusterInvoker
实例。
首先是创建RegistryDirectory
,创建完成的实例:
-->List<Router> routers: [MockInvokersSelector实例]
-->Registry registry: 上述的ZookeeperRegistry实例(zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&interface=com.alibaba.dubbo.registry.RegistryService&pid=19549×tamp=1551529755848)
-->String serviceKey: dubbo_test/com.alibaba.dubbo.registry.RegistryService
-->String[] serviceMethods: [sayHello]
-->Class<T> serviceType: interface com.alibaba.dubbo.demo.DemoService
-->URL url: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=19549&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549®ister.ip=172.16.132.166&side=consumer×tamp=1551529753690×tamp=1551529755848
-->URL consumerUrl: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=19549&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549®ister.ip=172.16.132.166&side=consumer×tamp=1551529753690×tamp=1551529755848
-->URL directoryUrl: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549®ister.ip=172.16.132.166&side=consumer×tamp=1551529753690
-->URL overrideDirectoryUrl: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549®ister.ip=172.16.132.166&side=consumer×tamp=1551529753690
-->Map<String, String> queryMap: {side=consumer, application=demo-consumer, register.ip=172.16.132.166, methods=sayHello, dubbo=2.0.0, pid=19549, check=false, interface=com.alibaba.dubbo.demo.DemoService, timestamp=1510225913509}
其中List<Router> routers
是在RegistryDirectory
的父类AbstractDirectory
中创建的,代码如下:
public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url;
this.consumerUrl = consumerUrl;
setRouters(routers);
}
protected void setRouters(List<Router> routers) {
// copy list
routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
// append url router
String routerKey = url.getParameter(Constants.ROUTER_KEY);
if (routerKey != null && routerKey.length() > 0) {
RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerKey);
routers.add(routerFactory.getRouter(url));
}
// append mock invoker selector
routers.add(new MockInvokersSelector());
Collections.sort(routers);
this.routers = routers;
}
之后向注册中心注册消费者,注册的方式与服务提供者一样。先是通过FailbackRegistry.register
,内部调用子类ZookeeperRegistry
的doRegister()
,如果注册失败,加入注册失败列表(会被后台失败重试定时任务重新注册)。
// url--消费者url
public void register(URL url) {
if (destroyed.get()){
return;
}
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服务器端发送注册请求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
// 跳过失败重试
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 将失败的注册请求记录到失败列表,定时重试
failedRegistered.add(url);
}
}
最后来看ZookeeperRegistry
的doRegister
方法:
// url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
在zk上创建临时节点(toUrlPath(url)
):
/dubbo_test/com.alibaba.dubbo.demo.DemoService/consumers/consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219
到此,消费者注册完成!之后directory.subscribe
进行订阅。RegistryDirectory.subscribe(URL url)
:
// 服务消费者订阅
// url : consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers,cells&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219
public void subscribe(URL url) {
setConsumerUrl(url);
// 本RegistryDirectory实例作为监听器,这里调用FailbackRegistry.subscribe
registry.subscribe(url, this);
}
FailbackRegistry.subscribe(URL url, NotifyListener listener)
核心代码:
public void subscribe(URL url, NotifyListener listener) {
if (destroyed.get()){
return;
}
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
// 根据订阅的url获取缓存的数据
List<URL> urls = getCacheUrls(url);
if (urls != null && urls.size() > 0) {
// 发起通知
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
}
}
ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
:
// 订阅url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219
// listener: RegistryDirectory实例
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
...
} else {
// 执行到这里!!!
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
这里的for循环是3次:
- /dubbo_test/com.alibaba.dubbo.demo.DemoService/providers
- /dubbo_test/com.alibaba.dubbo.demo.DemoService/configurators
- /dubbo_test/com.alibaba.dubbo.demo.DemoService/routers
执行完上述for循环后,来看此时的:
ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
:
{
consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219
=
{RegistryDirectory实例=ZookeeperRegistry中的匿名内部类ChildListener实例}
}
List<URL> urls
:(3个元素)
[
dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18594&side=provider×tamp=1551525684453, ,
empty://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219,
empty://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219
]
注意:第一个元素是在执行List<String> children = zkClient.addChildListener(path, zkListener)
代码时,会返回当前path
下的节点(实际上就是第一次服务发现)。
之后一路执行到AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
:
// 根据订阅url,找出匹配的变更数据urls,触发对应的监听器
// @param url 订阅的url
// @param listener 订阅的url对应的通知监听器
// @param urls 变更后,目前的数据
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if (urls == null) {
throw new IllegalArgumentException("notify urls == null");
}
if (urls.size() == 0 && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// <category, 匹配的变更数据list>
Map<String, List<URL>> result = new HashMap<String, List<URL>>(16);
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
// <category, 匹配的变更数据>
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>(16));
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 保存数据到本地磁盘
saveProperties(url);
// 通知变更
listener.notify(categoryList); // !!!这里通知RegistryDirectory监听器
}
}
首先是一个for循环对传入的url
列表进行分类,分类结果如下:
Map<String, List<URL>> result
:
{
configurators=[
empty://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219
],
routers=[
empty://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer×tamp=1551531392219
],
providers=[
dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18594&side=provider×tamp=1551525684453
]
}
之后执行第二个for循环,对上述的result
进行遍历,分别进行保存文件和通知。其中前两个entry没做什么核心事,直接来看providers
的entry的通知。代码RegistryDirectory.notify(List<URL> urls)
。这里的urls
就是上边的providers
值。
// 服务数据变更是全量通知
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol(); // 协议
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); // 类型
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: "
+ url + " from registry " + getUrl().getAddress() + " to consumer "
+ NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && configuratorUrls.size() > 0) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && routerUrls.size() > 0) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override参数
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
这里首先将传入的provider
的url
存放在invokerUrls
列表中,之后调用refreshInvoker(invokerUrls)
。
// Map<消费者引用服务url, RegistryDirectory.InvokerDelegete>
private volatile Map<String, Invoker<T>> urlInvokerMap;
// Map<methodName, List<RegistryDirectory.InvokerDelegete>>
private volatile Map<String, List<Invoker<T>>> methodInvokerMap;
// 刷新invoker map
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {
this.forbidden = false; // 允许访问
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls); // 缓存invokerUrls列表,便于交叉对比
}
if (invokerUrls.size() == 0) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 将方法名映射为Invoker列表
// state change
//如果计算错误,则不进行处理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
// 多分组
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 关闭未使用的Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
// 将urls转成invokers
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.size() == 0) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// 如果reference端配置了protocol,则只选择匹配的protocol
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
// 不匹配的提供者url,跳到下一个
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 判断提供者url协议是否支持
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
// providerUrl: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18594&side=provider×tamp=1551525684453
// url: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=20455®ister.ip=172.16.132.166&remote.timestamp=1551525684453&side=consumer×tamp=1551534134492
URL url = mergeUrl(providerUrl); // 合并消费者等参数到提供者url
// key: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=20455®ister.ip=172.16.132.166&remote.timestamp=1551525684453&side=consumer×tamp=1551534134492
String key = url.toFullString(); // URL参数是排序的(这个key是配置合并之后的url字符串表示)
if (keys.contains(key)) { // 重复URL
continue;
}
keys.add(key);
// 缓存的key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 缓存中没有,重新refer
try {
boolean enabled = true; // enabled参数控制是否引用
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// serviceType: Class<com.alibaba.dubbo.demo.DemoService>
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // 将新的引用放入缓存
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
这里会遍历urls
(providerUrls
):protocol
是Protocol$Adaptive
实例,依旧是走ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol
,看一下ProtocolFilterWrapper
部分:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// registry协议
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// protocol.refer(type, url): DubboInvoker实例
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY,
Constants.CONSUMER);
}
两个常量是:reference.filter
和consumer
。最后来看DubboProtocol.refer
:
// 引用服务
// @param serviceType 业务接口 com.alibaba.dubbo.demo.DemoService
// @param url 远程服务的URL地址(合并消费者参数之后的提供者url): dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=20455®ister.ip=172.16.132.166&remote.timestamp=1551525684453&side=consumer×tamp=1551534134492
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
这里首先执行getClients
创建Netty
客户端,创建客户端与服务端的长连接,之后封装为DubboInvoker
,最后返回。返回之后进行Filter
链包装该DubboInvoker
实例,再使用ListenerInvokerWrapper
包装带有Filter
链的DubboInvoker
实例,最后又会使用InvokerDelegete
包装ListenerInvokerWrapper
实例。在最后,将该InvokerDelegete
实例放置到newUrlInvokerMap
缓存中,这就是整个toInvokers(List<URL> urls)
的逻辑。最后再将newUrlInvokerMap
转换封装到Map<String, List<Invoker<T>>> newMethodInvokerMap
缓存中。这就是整个refreshInvoker(List<URL> invokerUrls)
的逻辑。执行完成之后,订阅通知就执行完了。
来看一下getClients(url)
:
// 根据合并消费者参数之后的提供者url获取ExchangeClient客户端
private ExchangeClient[] getClients(URL url) {
// 是否共享连接(TCP)
boolean serviceShareConnect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0) {
serviceShareConnect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (serviceShareConnect) {
// 共享
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
// 获取共享TCP连接
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress(); // 提供者 ip:port
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
synchronized (key.intern()) { // 如果常量池中存在当前字符串key, 就会直接返回当前字符串. 如果常量池中没有此字符串, 会将此字符串放入常量池中后, 再返回
ExchangeClient exchangeClient = initClient(url); // 初始化
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
}
// 创建新TCP连接.
private ExchangeClient initClient(URL url) {
// client type setting.---netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 协议编解码
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO存在严重性能问题,暂时不允许使用
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// 若设置连接应该是lazy的
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 则建立懒连接client,直到请求的时候才建立TCP连接
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 否则直接建立TCP连接,返回HeaderExchangeClient实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
注意:这里由于使用了共享链接,实际上就是在一个消费者机器和一个服务提供者机器之间只建立一条TCP长连接,也可以指定连接数,那样就会建立多条连接。
最后执行到HeaderExchanger.connect(URL url, ExchangeHandler handler)
:
// 连接到一台服务器 Transporters.connect()连接到服务端,返回NettyClient实例
// @param url 合并消费者参数等之后的提供者url
// @param handler private final ExchangeHandler requestHandler = new ExchangeHandlerAdapter(){} 请求处理器
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// Transporters.connect() --> NettyClient
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
执行Transporters.connect
:
// 连接到一台服务器,返回NettyClient
// @param url 合并消费者参数等之后的提供者url
// @param handlers DecodeHandler实例
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
执行NettyTransporter.connect
:
// 连接到一台服务器
// @param url server url 合并消费者参数等之后的提供者url
// @param listener DecodeHandler实例
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
// NettyClient初始化
// @param url 合并消费者参数等之后的提供者url
// @param handler DecodeHandler实例
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// wrapChannelHandler(url, handler) --> MultiMessageHandler实例
super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
// url: 合并消费者参数等之后的URL,加上线程池名和线程池类型key value
return ChannelHandlers.wrap(handler, url);
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url)));
}
这里继续包装handler
。和provider
一样,共6层,如下所示。
MultiMessageHandler
-->HeartBeatHandler
-->AllChannnelHandler
-->DecodeHandler
-->HeaderExchangeHandler
-->ExchangeHandlerAdapter实例(DubboProtocol类中的匿名内部类)
之后调用父类AbstractClient
的构造器:
// @param url 合并消费者参数等之后的提供者url(不包含线程池线程名和线程池类型key value)
// @param handler MultiMessageHandler实例
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 发送重新连接true/false ??
sendReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
// 连接关闭超时时间 1000 * 60 * 15
shutdownTimeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// 默认重连间隔2s,1800表示1小时warning一次 即1800*2s=3600s=1hour
reconnectWarningPeriod = url.getParameter("reconnect.warning.period", 1800);
try {
doOpen(); // 真正的初始化netty client
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
connect(); // netty client连接服务端netty server
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
// 获取线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
再调用父类AbstractEndpoint
和AbstractPeer
的构造器:
// 编解码器 DubboCountCodec
private Codec2 codec;
// 远程服务调用超时时间
private int timeout;
// 连接超时时间
private int connectTimeout;
// @param url 合并消费者参数等之后的提供者url(不包含线程池线程名和线程池类型key value)
// @param handler MultiMessageHandler实例
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
this.codec = getChannelCodec(url); // dubbo
this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用超时,默认1s
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); // 连接超时,默认3s
}
private final ChannelHandler handler; // MultiMessageHandler实例
private volatile URL url; // 合并消费者参数之后的提供者url
// @param url 合并消费者参数等之后的提供者url(不包含线程池线程名和线程池类型key value)
// @param handler MultiMessageHandler实例
public AbstractPeer(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
在进行以上一系列的赋值后,打开Netty客户端:
protected void doOpen() throws Throwable {
// 设置logger factory
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory); // Netty客户端引导类
// config 配置Channel TCP 属性
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true); // 开启TCP心跳检测
bootstrap.setOption("tcpNoDelay", true); // 关闭Nagle算法,实时性高
bootstrap.setOption("connectTimeoutMillis", getTimeout()); // 连接超时
// ChannelHandler 处理入站、出站事件
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
// 构造器参数: <DubboCountCodec,合并消费者参数之后的提供者url,ChannelHandler(NettClient实例)>
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder()); // 解码器
pipeline.addLast("encoder", adapter.getEncoder()); // 编码器
pipeline.addLast("handler", nettyHandler); // 客户端逻辑处理器,处理入站、出站事件
return pipeline;
}
});
}
之后连接Netty服务端:
// 连接到服务器端
protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) { // 是否已连接到服务端
return;
}
// 定时检测连接状态,如果连接断开了,则重连(定时任务)
initConnectStatusCheckCommand();
// 连接服务端
doConnect();
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress()
+ " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
+ " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successful connect to server " + getRemoteAddress()
+ " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
+ " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnectCount.set(0); // 重连次数设置为0
reconnectErrorLogFlag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress()
+ " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
+ " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
// 连接服务端
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis(); // 连接开始时间
// getConnectAddress()返回服务端地址
ChannelFuture future = bootstrap.connect(getConnectAddress()); // 异步连接操作,返回的是一个future
try {
// 在指定的超时时间内,等待连接建立
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
// 指定时间内建立连接成功
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE); // 设置Channel感兴趣的事件是读写事件
try {
// 关闭旧的连接
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
// 移除channel缓存
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
// 客户端是否关闭
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
// 这里成员变量channel进行复制
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
// 建立连接失败,发生了异常
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + ", error message is: " + future.getCause().getMessage(), future.getCause());
} else {
// 在建立连接过程中超时了
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
// 取消连接
future.cancel();
}
}
}
到此为止NettyClient
就创建好了,之后将该client
封装为HeaderExchangeClient
中。
// 心跳线程
private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
// NettyClient实例
private final Client client;
// HeaderExchangeChannel实例
private final ExchangeChannel channel;
// 心跳定时器
private ScheduledFuture<?> heatbeatTimer;
// 心跳超时,毫秒。缺省0,不会执行心跳。
private int heartbeat; // 心跳间隔
private int heartbeatTimeout; // 心跳超时
// client: NettyClient实例, needHeartbeat: true
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client; // NettyClient实例
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); // dubbo协议版本
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
// 开启心跳检测
startHeatbeatTimer();
}
}
启动心跳。
最后将HeaderExchangeClient
实例封装为ReferenceCountExchangeClient
:
// @param client HeaderExchangeClient实例
// @param ghostClientMap
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
this.client = client;
refenceCount.incrementAndGet();
this.url = client.getUrl();
if (ghostClientMap == null) {
throw new IllegalStateException("ghostClientMap can not be null, url: " + url);
}
this.ghostClientMap = ghostClientMap;
}
最后放到缓存Map<String, ReferenceCountExchangeClient> referenceClientMap
中。最后将ReferenceCountExchangeClient
封装到DubboInvoker
中。来看此时的DubboInvoker
:
-->Map<String, String> attachment: {interface=com.alibaba.dubbo.demo.DemoService}
-->ExchangeClient[] clients:[ReferenceCountExchangeClient实例] // 如果设置了多条连接,此处有多个client
-->Class<T> type: interface com.alibaba.dubbo.demo.DemoService
-->Url url: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=20455®ister.ip=172.16.132.166&remote.timestamp=1551525684453&side=consumer×tamp=1551534134492
之后对DubboInvoker
实例进行Filter
链的包装:
ConsumerContextFilter->FutureFilter->MonitorFilter->DubboInvoker
.
然后将包装后的Invoker
实例包装为ListenerInvokerWrapper
实例,最后再将ListenerInvokerWrapper
实例包装为InvokerDelegete
实例。最后的最后,终极目的:初始化RegistryDirectory
的两个属性:
Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[RegistryDirectory$InvokerDelegete实例], *=[RegistryDirectory$InvokerDelegete实例]}
Map<String, Invoker<T>> urlInvokerMap={dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21870®ister.ip=192.168.0.101&remote.timestamp=1551540659260&side=consumer×tamp=1551541271221
=RegistryDirectory$InvokerDelegete实例}
到此为止,订阅就完成了。现在来看RegistryProtocol.doRefer
的最后一行代码:return cluster.join(directory)
:
这里的cluster
是Cluster$Adaptive
实例:
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover"); // failover
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
这里的extName="failover"
,这里会进行AOP:MockClusterWrapper
包装FailoverCluster
。
public class MockClusterWrapper implements Cluster {
private Cluster cluster; // FailoverCluster实例
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
这里的cluster
是FailoverCluster
实例。
// 失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
这里实际上就是创建一个FailoverClusterInvoker
实例,通过其父类AbstractClusterInvoker
存储属性。
最后创建一个MockClusterInvoker
实例:
private final Directory<T> directory; // RegistryDirectory实例
private final Invoker<T> invoker; // FailoverClusterInvoker实例
public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
this.directory = directory;
this.invoker = invoker;
}
到此为止,下边的第一行代码就结束了。最终得到一个MockClusterInvoker
实例:
directory=RegistryDirectory
实例:invoker=FailoverClusterInvokers
实例(该实例中又包含一个Directory<T> directory
属性,值为上述的RegistryDirectory
实例)
private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 引用服务 ###1
}
...
// 创建服务代理
return (T) proxyFactory.getProxy(invoker); // ###2
}
二、使用ProxyFactory创建代理
(T) proxyFactory.getProxy(invoker);
上述的proxyFactory
是ProxyFactory$Adaptive
实例,调用getProxy(Invoker<T> invoker)
时内部最终得到是一个被StubProxyFactoryWrapper
包装后的JavassistProxyFactory
。调用JavassistProxyFactory.getProxy(Invoker<T> invoker)
时,先调用父类AbstractProxyFactory.getProxy(Invoker<T> invoker)
方法,再调用JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] types)
方法。
public abstract class AbstractProxyFactory implements ProxyFactory {
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
Class<?>[] interfaces = null; // 接口处理(服务代理类实现的接口)
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface(); // 业务接口
interfaces[1] = EchoService.class; // 回声测试接口
for (int i = 0; i < types.length; i++) {
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) { // 业务接口 + 回声测试接口
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
return getProxy(invoker, interfaces);
}
// 服务代理实例
// @param invoker 调用执行体
// @param types 接口类型
// @param <T> 业务接口
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
}
// JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] types)
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
- invoker:
MockClusterInvoker
实例 - interfaces:
[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
注意这里的Proxy
不是jdk
的,而是Dubbo
的。
Proxy.getProxy(interfaces)
:
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
...
Proxy proxy = null;
...
// create ProxyInstance class. // 服务代理类
String pcn = pkg + ".proxy" + id; // 类名 com.alibaba.dubbo.common.bytecode.proxy0
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
ccp.addDefaultConstructor();
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// create Proxy class. 工厂类,用于生成服务代理类实例(newInstance()方法)
String fcn = Proxy.class.getName() + id; // com.alibaba.dubbo.common.bytecode.Proxy0
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
Class<?> pc = ccm.toClass();
proxy = (Proxy) pc.newInstance();
...
return proxy; // 返回工厂类实例
}
从代码来看,会生成两个Class
对象:pc
是创建服务代理类实例的工厂类;clazz
是服务代理类。最终返回的proxy
是如下com.alibaba.dubbo.common.bytecode.Proxy0
类对象;之后调用了com.alibaba.dubbo.common.bytecode.Proxy0.newInstance(InvocationHandler invocationHandler)
方法创建出了com.alibaba.dubbo.common.bytecode.proxy0
类对象,并初始化了其中的InvocationHandler handler
对象为InvokerInvocationHandler
实例。
最终会生成两个类:
// com.alibaba.dubbo.common.bytecode.Proxy0 工厂类
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.Proxy;
import com.alibaba.dubbo.common.bytecode.proxy0;
import java.lang.reflect.InvocationHandler;
public class Proxy0 extends Proxy implements ClassGenerator.DC {
@Override
public Object newInstance(InvocationHandler invocationHandler) {
return new proxy0(invocationHandler); // 创建服务代理类实例
}
}
// com.alibaba.dubbo.common.bytecode.proxy0 服务代理类
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
public static Method[] methods;
private InvocationHandler handler; // InvokerInvocationHandler实例
public String sayHello(String string) {
Object[] arrobject = new Object[]{string};
Object object = this.handler.invoke(this, methods[0], arrobject);
return (String)object;
}
public Object $echo(Object object) {
Object[] arrobject = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[1], arrobject);
return object2;
}
public proxy0() {
}
public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}
}
上边的Method[] methods
数组实际上已经包含了两个元素:
[public abstract java.lang.String com.alibaba.dubbo.demo.DemoService.sayHello(java.lang.String),
public abstract java.lang.Object com.alibaba.dubbo.rpc.service.EchoService.$echo(java.lang.Object)]
如上所示,最终返回的服务代理对象其实是一个proxy0
对象,当调用其sayHello
方法时,其调用内部的handler.invoke
方法。
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker; // MockClusterInvoker实例
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
// 返回调用结果
// @param proxy 服务代理类(com.alibaba.dubbo.common.bytecode.proxy0实例)
// @param method 调用的方法
// @param args 方法参数
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// Object方法,直接调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// RPC调用出发点,构建Invocation
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
这里的invoker
是上述的MockClusterInvoker
实例。
到此为止,DemoService demoService = (DemoService) context.getBean("demoService");
该行代码就结束了。最终得到的demoService
是一个com.alibaba.dubbo.common.bytecode.proxy0
实例(是一个代理)。