







每一个具体的注册中心实现init()方法.比如NacosConfiguration 的init 方法

public void init() {
    group = url.getParameter(Constants.GROUP_KEY, "DEFAULT_GROUP");
    nameSpace = url.getParameter(Constants.NAMESPACE_KEY, "public");
    configService = buildConfigService(url);

在类ConfigCenter 配置中心实现中,首先实现GovernanceConfiguration实例,根据configCenter 配置的协议提取不同的实现.

if (StringUtils.isNotEmpty(configCenter)) {
    configCenterUrl = formUrl(configCenter, configCenterGroup, configCenterGroupNameSpace, username, password);
    dynamicConfiguration = ExtensionLoader.getExtensionLoader(GovernanceConfiguration.class).getExtension(configCenterUrl.getProtocol());

调用不同的GovernanceConfiguration 实现类

在类ConfigCenter 中,实现不同RegistryFactory,根据configCenter 配置的协议提取不同的实现.

Registry getRegistry() {
    Registry registry = null;
    if (registryUrl == null) {
        if (StringUtils.isBlank(registryAddress)) {
            throw new ConfigurationException("Either config center or registry address is needed, please refer to https://github.com/apache/incubator-dubbo-admin/wiki/Dubbo-Admin-configuration");
        registryUrl = formUrl(registryAddress, registryGroup, registryNameSpace, username, password);
    RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
    registry = registryFactory.getRegistry(registryUrl);
    return registry;

比如NacosRegistryFactory 实现nacos的注册中心

protected Registry createRegistry(URL url) {
    return new NacosRegistry(url, createNamingService(url));


public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);

RegistryServerSync 实现注册中心的同步操作


public class RegistryServerSync implements InitializingBean, DisposableBean, NotifyListener {
public void afterPropertiesSet() throws Exception {
    logger.info("Init Dubbo Admin Sync Cache...");
    registry.subscribe(SUBSCRIBE, this);

FailbackRegistry实现Registry的subscribe() 方法

public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // Sending a subscription request to the server side
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;
        List<URL> urls = getCacheUrls(url);
        if (CollectionUtils.isNotEmpty(urls)) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);

        // Record a failed registration request to a failed list, retry regularly
        addFailedSubscribed(url, listener);

由NacosRegistry类实现上述的doSubscribe(url, listener):

public void doSubscribe(final URL url, final NotifyListener listener) {
    Set<String> serviceNames = getServiceNames(url, listener);

    //Set corresponding serviceNames for easy search later
    if (isServiceNamesWithCompatibleMode(url)) {
        for (String serviceName : serviceNames) {
            NacosInstanceManageUtil.setCorrespondingServiceNames(serviceName, serviceNames);

    doSubscribe(url, listener, serviceNames);

getServiceNames(url, listener) 主要过程:调用nacos server对应接口/nacos/v1/ns/service/list, 取得对应nacos注册中心的实例.

在getServiceNames 关键点启动定时任务,定时从nacos取得serviceNames,并且同步到监听者,参照下列代码

private Set<String> getServiceNames(URL url, NotifyListener listener) {
    if (isAdminProtocol(url)) {
        scheduleServiceNamesLookup(url, listener);
        return getServiceNamesForOps(url);
    } else {
        return getServiceNames0(url);

scheduleServiceNamesLookup:newSingleThreadScheduledExecutor 定时任务取得serviceNames

private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) {
    if (scheduledExecutorService == null) {
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            Set<String> serviceNames = getAllServiceNames();
            filterData(serviceNames, serviceName -> {
                boolean accepted = false;
                for (String category : ALL_SUPPORTED_CATEGORIES) {
                    String prefix = category + SERVICE_NAME_SEPARATOR;
                    if (serviceName != null && serviceName.startsWith(prefix)) {
                        accepted = true;
                return accepted;
            doSubscribe(url, listener, serviceNames);

doSubscribe(url, listener, serviceNames) 中根据代码追踪,会根据serviceNames 取得 List<Instance> instances

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
        throws NacosException {
    EventListener eventListener = event -> {
        if (event instanceof NamingEvent) {
            NamingEvent e = (NamingEvent) event;
            List<Instance> instances = e.getInstances();

            if (isServiceNamesWithCompatibleMode(url)) {
                 * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
                 * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
                NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
                instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
            notifySubscriber(url, listener, instances);

notifySubscriber 会调用NacosRegistry.this.notify

private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
    List<Instance> healthyInstances = new LinkedList<>(instances);
    if (healthyInstances.size() > 0) {
        // Healthy Instances
    List<URL> urls = toUrlWithEmpty(url, healthyInstances);
    NacosRegistry.this.notify(url, listener, urls);


protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.

这样会实现NotifyListener 的 void notify(List<URL> urls)

public interface NotifyListener {
    void notify(List<URL> urls);


在RegistryServerSync会实现上述void notify(List<URL> urls) 的重写,最终会把最终的从注册中心取得实例存放到内存中

private final ConcurrentMap<String, ConcurrentMap<String, Map<String, URL>>> registryCache = new ConcurrentHashMap<>();


自学咖网 » dubbo中从nacos注册中心取得实例的代码分析