Dubbo源码(七) – 集群
前言
本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo
集群(cluster)就是一组计算机,它们作为一个总体向用户提供一组网络资源。这些单个的计算机系统就是集群的节点(node)。
在Dubbo中,为了避免单点故障,同一个服务允许有多个服务提供者,也允许同时连接多个注册中心。那么,服务消费者引用服务时,该请求哪个注册中心的服务提供者以及调用失败之后该如何处理呢?这些就是Dubbo集群所做的事。
集群容错
在分析集群源码之前,先看看集群容错的所有组件,下图是官方文档的组件图
Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker:
- Cluster 是接口,其只有一个方法,负责生成Cluster Invoker
- Cluster Invoker继承了Invoker接口,是一个 Invoker,是主要逻辑实现的地方
将上图从中间切分,可将集群工作过程分为两个阶段,左边为第一阶段。
-
第一个阶段是在服务消费者初始化期间。
集群 Cluster 实现类为服务消费者创建 Cluster Invoker,即图上的 merge 操作,也就是将多个服务提供者合并为一个 Cluster Invoker
-
第二个阶段是在服务消费者进行远程调用时。
步骤大体上就如图所示:list → route → select → invoke
- list:从服务目录拿到 invoker 集合
- route:通过路由过滤出符合规则的 invoker 集合
- select:通过负载均衡从 invoker 集合中选择一个
- invoke:执行 invoker 的 invoke 方法,进行真正的远程调用
其中,list、route操作在之前文章讲过了,传送门:《服务目录》、《服务路由》
select 不是本文重点,后续负载均衡时讲解。
以上就是集群工作的整个流程,这里并没有介绍集群是如何容错的,也就是 invoke 步骤调用失败的处理。Dubbo提供了多种容错方式:集群容错示例
下面的源码我们以默认的 Failover Cluster – 失败自动切换 进行分析
源码分析
Cluster
首先来看看 Cluster 接口,这是一个自适应拓展类,默认实现为FailoverCluster
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
前面讲了,Cluster 的作用就是将多个服务提供者合并为一个 Cluster Invoker
多个服务提供者合并也就是 服务目录(Directory) 中的 invoker 集合。join 方法返回了一个 Cluster Invoker
接下来,我们看看调用路径。 Cluster 接口在多个地方被调用,我们看服务消费者初始化期间的调用。
// 调用路径如下:
// ReferenceBean#getObject()
// ReferenceConfig#get()
// ReferenceConfig#init()
// ReferenceConfig#createProxy(Map<String, String> map)
// RegistryProtocol#refer(Class<T> type, URL url)
// RegistryProtocol#doRefer(Cluster cluster, Registry registry, Class<T> type, URL url)
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服务消费者链接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注册服务消费者,在 consumers 目录下新节点
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
// 订阅 providers、configurators、routers 等节点数据
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
调用在倒数第三行代码。
如果看过我之前写的《服务引用》那篇文章,想必对 doRefer 方法不陌生了。在服务目录订阅完注册中心的数据后,就调用 join 方法生成 Cluster Invoker
啰嗦多一句:
可以这么理解,实际负责远程调用的,是服务目录中的 invoker 集合中的 invoker,而 Cluster Invoker 则对服务目录中的 invoker 集合进行处理。
Cluster Invoker
默认的 Cluster Invoker 是FailoverClusterInvoker
,既然是一个 Invoker,我们就从它的 invoke 方法入手。
AbstractClusterInvoker
invoke 方法在它的父类AbstractClusterInvoker
中
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;
// 绑定 attachments 到 invocation 中.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 列举 Invoker
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
// 加载 LoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 调用 doInvoke 进行后续操作
return doInvoke(invocation, invokers, loadbalance);
}
invoke 方法逻辑也很简单:
- 列举 invoker
- 加载 LoadBalance(自适应拓展类)
- 调用 doInvoke 进行后续操作
其中列举 invoker 如下
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
list 方法就是调用服务目录的 list 方法,里面做了两件事(结合前面的组件图):
- list:从服务目录拿到 invoker 集合
- route:通过路由过滤出符合规则的 invoker 集合
FailoverClusterInvoker
doInvoke 方法具体实现在FailoverClusterInvoker
中,此 invoker 的容错方式为失败自动切换。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 获取重试次数
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// 循环调用,失败重试
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
copyinvokers = list(invocation);
// check again
// 对 copyinvokers 进行判空检查
checkInvokers(copyinvokers, invocation);
}
// 通过负载均衡选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用目标 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 若重试失败,则抛出异常
throw new RpcException(xxx);
}
doInvoke 方法代码量不少,但是逻辑简化之后也很简单,就是根据重试次数,在 for 循环中进行远程调用,成功则返回,失败就重试。如果重试次数耗尽还无法调用成功,则抛出异常。
从这里可以知道,Dubbo的默认失败重试次数是3次。
此方法中我们关注下 select 方法,它负责从 invoker 集合中选出一个 infoker
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
// 获取调用方法名
String methodName = invocation == null ? "" : invocation.getMethodName();
// 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的
// 调用同一个服务提供者,除非该提供者挂了再进行切换
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
// 检测 invokers 列表是否包含 stickyInvoker,如果不包含,
// 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
// 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含
// stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。
// 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
// availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的
// isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
// 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。
// 此时继续调用 doSelect 选择 Invoker
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
// 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
select 方法主要处理对粘滞连接特性的支持。注释写的很清楚了。选择 invoker 的操作在 doSelect 方法
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)
return invokers.get(0);
if (loadbalance == null) {
// 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
// 通过负载均衡组件选择 Invoker
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
// 如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
// 进行重选
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
// rinvoker 为空,定位 invoker 在 invokers 中的位置
int index = invokers.indexOf(invoker);
try {
// 获取 index + 1 位置处的 Invoker,以下代码等价于:
// invoker = invokers.get((index + 1) % invokers.size());
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
这里通过负载均衡选出 invoker,如果 invoker 在 selected 中(就是在doInvoke方法中调用失败的invoker)或者不可用,则调用 reselect 方法进行重选。如果重选还是选不出 invoker,则返回 invoker 集合中的下一个元素。这里的繁琐判断,就是为了尽量保证拿到可用的 invoker
我们继续看看 reselect 方法
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
throws RpcException {
List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
// 下面的 if-else 分支逻辑有些冗余,pull request #2826 对这段代码进行了简化,可以参考一下
// 根据 availablecheck 进行不同的处理
if (availablecheck) { // invoker.isAvailable() should be checked
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
} else { // do not check invoker.isAvailable()
for (Invoker<T> invoker : invokers) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
{
// 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛选。
// 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集合中
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) // available first
&& !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
return null;
}
这个方法可以分成两部分:
- 在非 selected 的 invoker 集合中,调用负载均衡选择一个 invoker
- 在步骤1无法选出 invoker 时,在 selected 中选出 invoker
至此,Dubbo的集群就讲完了。负载均衡有空再说。
再论Cluster
前面我们提到 Cluster 接口在多个地方被调用,也讲了同一个服务有多个服务提供者时的处理。那么,有多个注册中心呢,该如何处理?
// 类ReferenceConfig
private T createProxy(Map<String, String> map) {
......
// 本地引用
if (isJvmRefer) {
......
// 远程引用
} else {
......
// 单个注册中心或服务提供者(服务直连,下同)
if (urls.size() == 1) {
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多个注册中心或多个服务提供者,或者两者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 获取所有的 Invoker
for (URL url : urls) {
// 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
// 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register"s cluster is available
// 如果注册中心链接不为空,则将使用 AvailableCluster
URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// 生成代理类
return (T) proxyFactory.getProxy(invoker);
}
createProxy 是服务引用时,生成服务代理对象的方法。这里会判断,如果有多个注册中心,会再封装一层集群,也就是先选择注册中心,再选择服务提供者。
这里一般情况 registryURL 不为空,cluster 使用的是AvailableCluster
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}
}
AvailableCluster的逻辑很简单,按顺序选择可使用的 invoker (这里的invoker其实就是每个注册中心)
总结
本篇文章介绍了Dubbo集群容错的整体工作过程和调用逻辑。Dubbo提供了多种集群实现,本文只介绍了Failover Cluster,其余实现感兴趣的可以自行查看源码。
参考资料
Dubbo开发指南