dubbo中从nacos注册中心取得实例的代码分析

dubbo中从nacos注册中心取得实例的代码分析

最近分析了dubbo-admin(代码连接:https://github.com/apache/dubbo-admin)管理平台如何从注册中心定时取得有效实例,同步到本地存储到本地内存中.

以下代码是dubbo-admin中关键代码实现流程的分析.

GovernanceConfiguration实现和Registry实现

首先接口GovernanceConfiguration实现采用的dubbo的SPI机制(SPI机制参照http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html),根据不同的key,具有不同的实现类.

在/resources/META-INF/dubbo.internal/org.apache.dubbo.admin.registry.config.GovernanceConfiguration下的内容,默认是@SPI(“zookeeper”).

zookeeper=org.apache.dubbo.admin.registry.config.impl.ZookeeperConfiguration
apollo=org.apache.dubbo.admin.registry.config.impl.ApolloConfiguration
nacos=org.apache.dubbo.admin.registry.config.impl.NacosConfiguration
consul=org.apache.dubbo.admin.registry.config.impl.ConsulConfiguration

每一个具体的注册中心实现init()方法.比如NacosConfiguration 的init 方法

public void init() {
    group = url.getParameter(Constants.GROUP_KEY, "DEFAULT_GROUP");
    nameSpace = url.getParameter(Constants.NAMESPACE_KEY, "public");
    configService = buildConfigService(url);
}

在类ConfigCenter 配置中心实现中,首先实现GovernanceConfiguration实例,根据configCenter 配置的协议提取不同的实现.

if (StringUtils.isNotEmpty(configCenter)) {
    configCenterUrl = formUrl(configCenter, configCenterGroup, configCenterGroupNameSpace, username, password);
    dynamicConfiguration = ExtensionLoader.getExtensionLoader(GovernanceConfiguration.class).getExtension(configCenterUrl.getProtocol());
    dynamicConfiguration.setUrl(configCenterUrl);
    dynamicConfiguration.init();

调用不同的GovernanceConfiguration 实现类

在类ConfigCenter 中,实现不同RegistryFactory,根据configCenter 配置的协议提取不同的实现.

@Bean
@DependsOn("governanceConfiguration")
Registry getRegistry() {
    Registry registry = null;
    if (registryUrl == null) {
        if (StringUtils.isBlank(registryAddress)) {
            throw new ConfigurationException("Either config center or registry address is needed, please refer to https://github.com/apache/incubator-dubbo-admin/wiki/Dubbo-Admin-configuration");
        }
        registryUrl = formUrl(registryAddress, registryGroup, registryNameSpace, username, password);
    }
    RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
    registry = registryFactory.getRegistry(registryUrl);
    return registry;
}

比如NacosRegistryFactory 实现nacos的注册中心

@Override
protected Registry createRegistry(URL url) {
    return new NacosRegistry(url, createNamingService(url));
}

上述实现中关键实现NamingService,利用反射实现NacosNamingService具体实例(NacosRegistryFactory)

public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

RegistryServerSync 实现注册中心的同步操作

RegistryServerSync实现InitializingBean,实现afterPropertiesSet()

public class RegistryServerSync implements InitializingBean, DisposableBean, NotifyListener {
@Override
public void afterPropertiesSet() throws Exception {
    logger.info("Init Dubbo Admin Sync Cache...");
    registry.subscribe(SUBSCRIBE, this);
}

FailbackRegistry实现Registry的subscribe() 方法

public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // Sending a subscription request to the server side
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;
        List<URL> urls = getCacheUrls(url);
        if (CollectionUtils.isNotEmpty(urls)) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedSubscribed(url, listener);
    }
}

由NacosRegistry类实现上述的doSubscribe(url, listener):

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    Set<String> serviceNames = getServiceNames(url, listener);

    //Set corresponding serviceNames for easy search later
    if (isServiceNamesWithCompatibleMode(url)) {
        for (String serviceName : serviceNames) {
            NacosInstanceManageUtil.setCorrespondingServiceNames(serviceName, serviceNames);
        }
    }

    doSubscribe(url, listener, serviceNames);
}

getServiceNames(url, listener) 主要过程:调用nacos server对应接口/nacos/v1/ns/service/list, 取得对应nacos注册中心的实例.

在getServiceNames 关键点启动定时任务,定时从nacos取得serviceNames,并且同步到监听者,参照下列代码

private Set<String> getServiceNames(URL url, NotifyListener listener) {
    if (isAdminProtocol(url)) {
        scheduleServiceNamesLookup(url, listener);
        return getServiceNamesForOps(url);
    } else {
        return getServiceNames0(url);
    }
}

scheduleServiceNamesLookup:newSingleThreadScheduledExecutor 定时任务取得serviceNames

private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) {
    if (scheduledExecutorService == null) {
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            Set<String> serviceNames = getAllServiceNames();
            filterData(serviceNames, serviceName -> {
                boolean accepted = false;
                for (String category : ALL_SUPPORTED_CATEGORIES) {
                    String prefix = category + SERVICE_NAME_SEPARATOR;
                    if (serviceName != null && serviceName.startsWith(prefix)) {
                        accepted = true;
                        break;
                    }
                }
                return accepted;
            });
            doSubscribe(url, listener, serviceNames);
        }, LOOKUP_INTERVAL, LOOKUP_INTERVAL, TimeUnit.SECONDS);
    }
}

doSubscribe(url, listener, serviceNames) 中根据代码追踪,会根据serviceNames 取得 List<Instance> instances

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
        throws NacosException {
    EventListener eventListener = event -> {
        if (event instanceof NamingEvent) {
            NamingEvent e = (NamingEvent) event;
            List<Instance> instances = e.getInstances();


            if (isServiceNamesWithCompatibleMode(url)) {
                /**
                 * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
                 * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
                 */
                NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
                instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
            }
            notifySubscriber(url, listener, instances);
        }
    };

notifySubscriber 会调用NacosRegistry.this.notify

private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
    List<Instance> healthyInstances = new LinkedList<>(instances);
    if (healthyInstances.size() > 0) {
        // Healthy Instances
        filterHealthyInstances(healthyInstances);
    }
    List<URL> urls = toUrlWithEmpty(url, healthyInstances);
    NacosRegistry.this.notify(url, listener, urls);
}

最终在AbstractRegistry中调用listenlistener.notify(categoryList);

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    ......
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
        saveProperties(url);
    }
}

这样会实现NotifyListener 的 void notify(List<URL> urls)

public interface NotifyListener {
    void notify(List<URL> urls);

}

在RegistryServerSync会实现上述void notify(List<URL> urls) 的重写,最终会把最终的从注册中心取得实例存放到内存中

private final ConcurrentMap<String, ConcurrentMap<String, Map<String, URL>>> registryCache = new ConcurrentHashMap<>();

以上,从nacos注册中心得到实例关键代码分析完毕.

hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » dubbo中从nacos注册中心取得实例的代码分析