基于Apache Zookeeper手写实现动态配置中心(纯代码实践)
相信大家都知道,每个项目中会有一些配置信息放在一个独立的properties文件中,比如application.properties。这个文件中会放一些常量的配置,比如数据库连接信息、线程池大小、限流参数。
在传统的开发模式下,这种方式很方便,一方面能够对配置进行统一管理,另一方面,我们在维护的时候很方便。
但是随着业务的发展以及架构的升级,在微服务架构中,服务的数量以及每个服务涉及到的配置会越来越多,并且对于配置管理的需求越来越高,比如要求实时性、独立性。
另外,在微服务架构下,会涉及到不同的环境下的配置管理、灰度发布、动态限流、动态降级等需求,包括对于配置内容的安全与权限,所以传统的配置维护方式很难达到需求。
因此,就产生了分布式配置中心。
- 传统的配置方式不方便维护
- 配置内容的安全和访问权限,在传统的配置方式中很难实现
- 更新配置内容时,需要重启
配置中心的工作流程
图11-1
Spring Boot的外部化配置
在本次课程中,我们会Zookeeper集成到Spring Boot的外部化配置中,让用户无感知的使用配置中心上的数据作为数据源,所以我们需要先了解Spring Boot中的外部化配置。
Spring Boot的外部化配置是基于Environment来实现的,它表示Spring Boot应用运行时的环境信息,先来看基本使用
Environment的使用
-
在spring boot应用中,修改aplication.properties配置
key=value
-
创建一个Controller进行测试
@RestController public class EnvironementController { @Autowired Environment environment; @GetMapping("/env") public String env(){ return environment.getProperty("key"); } }
@Value注解使用
在properties文件中定义的属性,除了可以通过environment的getProperty方法获取之外,spring还提供了@Value注解,
@RestController
public class EnvironementController {
@Value("${env}")
private String env;
@GetMapping("/env")
public String env(){
return env;
}
}
spring容器在加载一个bean时,当发现这个Bean中有@Value注解时,那么它可以从Environment中将属性值进行注入,如果Environment中没有这个属性,则会报错。
Environment设计猜想
Spring Boot的外部化配置,不仅仅只是appliation.properties,包括命令行参数、系统属性、操作系统环境变量等,都可以作为Environment的数据来源。
- @Value(“${java.version}”) 获取System.getProperties , 获取系统属性
- 配置command的jvm参数,
-Denvtest=command
,然后通过@Value("${envtest}")
图11-2
- 第一部分是属性定义,这个属性定义可以来自于很多地方,比如application.properties、或者系统环境变量等。
- 然后根据约定的方式去指定路径或者指定范围去加载这些配置,保存到内存中。
- 最后,我们可以根据指定的key从缓存中去查找这个值。
扩展Environment
我们可以自己扩展Environment中的数据源,代码如下;
其中,EnvironmentPostProcessor:它可以在spring上下文构建之前可以设置一些系统配置。
CusEnvironmentPostProcessor
public class CusEnvironmentPostProcessor implements EnvironmentPostProcessor {
private final Properties properties=new Properties();
private String propertiesFile="custom.properties";
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
Resource resource=new ClassPathResource(propertiesFile);
environment.getPropertySources().addLast(loadProperties(resource));
}
private PropertySource<?> loadProperties(Resource resource){
if(!resource.exists()){
throw new IllegalArgumentException("file:{"+resource+"} not exist");
}
try {
properties.load(resource.getInputStream());
return new PropertiesPropertySource(resource.getFilename(),properties);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
custom.properties
在classpath目录下创建custom.properties文件
name=mic
age=18
spring.factories
在META-INF目录下创建spring.factories文件,因为EnvironmentPostProcessor的扩展实现是基于SPI机制完成的。
org.springframework.boot.env.EnvironmentPostProcessor=
com.example.springbootzookeeper.CusEnvironmentPostProcessor
TestController
创建测试类,演示自定义配置加载的功能。
@RestController
public class TestController {
@Value("${name}")
public String val;
@GetMapping("/")
public String say(){
return val;
}
}
总结
通过上面的例子我们发现,在Environment中,我们可以通过指定PropertySources来增加Environment外部化配置信息,使得在Spring Boot运行期间自由访问到这些配置。
那么我们要实现动态配置中心,无非就是要在启动的时候,从远程服务器上获取到数据保存到PropertySource中,并且添加到Environment。
下面我们就开始来实现这个过程。
Zookeeper实现配置中心
在本小节中,主要基于Spring的Environment扩展实现自己的动态配置中心,代码结构如图11-3所示。
图11-3
自定义配置中心的相关说明
在本次案例中,我们并没有完全使用EnvironmentPostProcessor这个扩展点,而是基于SpringFactoriesLoader自定义了一个扩展点,主要目的是让大家知道EnvironmentPostProcessor扩展点的工作原理,以及我们以后自己也可以定义扩展点。
代码实现
以下是所有代码的实现过程,按照下面这个步骤去开发即可完成动态配置中心。
ZookeeperApplicationContextInitializer
ApplicationContextInitializer扩展,它是在ConfigurableApplicationContext通过调用refresh函数来初始化Spring容器之前会进行回调的一个扩展方法,我们可以在这个扩展中实现Environment的扩展。
所以这个类的主要作用就是在ApplicationContext完成refresh之前,扩展Environment,增加外部化配置注入。
public class ZookeeperApplicationContextInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext>{
//PropertySourceLocator接口支持扩展自定义配置加载到spring Environment中。
private final List<PropertySourceLocator> propertySourceLocators;
public ZookeeperApplicationContextInitializer(){
//基于SPI机制加载所有的外部化属性扩展点
ClassLoader classLoader=ClassUtils.getDefaultClassLoader();
//这部分的代码是SPI机制
propertySourceLocators=new ArrayList<>(SpringFactoriesLoader.loadFactories(PropertySourceLocator.class,classLoader));
}
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
//获取运行的环境上下文
ConfigurableEnvironment environment=applicationContext.getEnvironment();
//MutablePropertySources它包含了一个CopyOnWriteArrayList集合,用来包含多个PropertySource。
MutablePropertySources mutablePropertySources = environment.getPropertySources();
for (PropertySourceLocator locator : this.propertySourceLocators) {
//回调所有实现PropertySourceLocator接口实例的locate方法,收集所有扩展属性配置保存到Environment中
Collection<PropertySource<?>> source = locator.locateCollection(environment,applicationContext);
if (source == null || source.size() == 0) {
continue;
}
//把PropertySource属性源添加到environment中。
for (PropertySource<?> p : source) {
//addFirst或者Last决定了配置的优先级
mutablePropertySources.addFirst(p);
}
}
}
}
创建classpath:/META-INF/spring.factories
org.springframework.context.ApplicationContextInitializer=
com.gupaoedu.example.zookeepercuratordemo.config.ZookeeperApplicationContextInitializer
PropertySourceLocator
PropertySourceLocator接口支持扩展自定义配置加载到spring Environment中。
public interface PropertySourceLocator {
PropertySource<?> locate(Environment environment,ConfigurableApplicationContext applicationContext);
//Environment表示环境变量信息
//applicationContext表示应用上下文
default Collection<PropertySource<?>> locateCollection(Environment environment, ConfigurableApplicationContext applicationContext) {
return locateCollection(this, environment,applicationContext);
}
static Collection<PropertySource<?>> locateCollection(PropertySourceLocator locator,
Environment environment,ConfigurableApplicationContext applicationContext) {
PropertySource<?> propertySource = locator.locate(environment,applicationContext);
if (propertySource == null) {
return Collections.emptyList();
}
return Arrays.asList(propertySource);
}
}
ZookeeperPropertySourceLocator
ZookeeperPropertySourceLocator用来实现基于Zookeeper属性配置的扩展点,它会访问zookeeper获取远程服务器数据。
public class ZookeeperPropertySourceLocator implements PropertySourceLocator{
private final CuratorFramework curatorFramework;
private final String DATA_NODE="/data"; //仅仅为了演示,所以写死目标数据节点
public ZookeeperPropertySourceLocator() {
curatorFramework= CuratorFrameworkFactory.builder()
.connectString("192.168.221.128:2181")
.sessionTimeoutMs(20000).connectionTimeoutMs(20000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.namespace("config").build();
curatorFramework.start();
}
@Override
public PropertySource<?> locate(Environment environment, ConfigurableApplicationContext applicationContext) {
System.out.println("开始加载远程配置到Environment中");
CompositePropertySource composite = new CompositePropertySource("configService");
try {
Map<String,Object> dataMap=getRemoteEnvironment();
//基于Map结构的属性源
MapPropertySource mapPropertySource=new MapPropertySource("configService",dataMap);
composite.addPropertySource(mapPropertySource);
addListener(environment,applicationContext);
} catch (Exception e) {
e.printStackTrace();
}
return composite;
}
private Map<String,Object> getRemoteEnvironment() throws Exception {
String data=new String(curatorFramework.getData().forPath(DATA_NODE));
//暂时支持json格式
ObjectMapper objectMapper=new ObjectMapper();
Map<String,Object> map=objectMapper.readValue(data, Map.class);
return map;
}
//添加节点变更事件
private void addListener(Environment environment, ConfigurableApplicationContext applicationContext){
NodeDataCuratorCacheListener curatorCacheListener=new NodeDataCuratorCacheListener(environment,applicationContext);
CuratorCache curatorCache=CuratorCache.build(curatorFramework,DATA_NODE,CuratorCache.Options.SINGLE_NODE_CACHE);
CuratorCacheListener listener=CuratorCacheListener
.builder()
.forChanges(curatorCacheListener).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
}
}
配置扩展点: classpath:/META-INF/spring.factories
com.gupaoedu.example.zookeepercuratordemo.config.PropertySourceLocator=
com.gupaoedu.example.zookeepercuratordemo.config.ZookeeperPropertySourceLocator
配置动态变更逻辑
NodeDataCuratorCacheListener
NodeDataCuratorCacheListener用来实现持久化订阅机制,当目标节点数据发生变更时,需要收到变更并且应用。
public class NodeDataCuratorCacheListener implements CuratorCacheListenerBuilder.ChangeListener {
private Environment environment;
private ConfigurableApplicationContext applicationContext;
public NodeDataCuratorCacheListener(Environment environment, ConfigurableApplicationContext applicationContext) {
this.environment = environment;
this.applicationContext=applicationContext;
}
@Override
public void event(ChildData oldNode, ChildData node) {
System.out.println("数据发生变更");
String resultData=new String(node.getData());
ObjectMapper objectMapper=new ObjectMapper();
try {
Map<String,Object> map=objectMapper.readValue(resultData, Map.class);
ConfigurableEnvironment cfe=(ConfigurableEnvironment)environment;
MapPropertySource mapPropertySource=new MapPropertySource("configService",map);
cfe.getPropertySources().replace("configService",mapPropertySource);
//发布事件,用来更新@Value注解对应的值(事件机制可以分两步演示)
applicationContext.publishEvent(new EnvironmentChangeEvent(this));
System.out.println("数据更新完成");
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
EnvironmentChangeEvent
定义一个环境变量变更事件。
public class EnvironmentChangeEvent extends ApplicationEvent {
public EnvironmentChangeEvent(Object source) {
super(source);
}
}
ConfigurationPropertiesRebinder
ConfigurationPropertiesRebinder接收事件,并重新绑定@Value注解的数据,使得数据能够动态改变
@Component
public class ConfigurationPropertiesRebinder implements ApplicationListener<EnvironmentChangeEvent> {
private ConfigurationPropertiesBeans beans;
private Environment environment;
public ConfigurationPropertiesRebinder(ConfigurationPropertiesBeans beans,Environment environment) {
this.beans = beans;
this.environment=environment;
}
@Override
public void onApplicationEvent(EnvironmentChangeEvent event) {
rebind();
}
public void rebind(){
this.beans.getFieldMapper().forEach((k,v)->{
v.forEach(f->f.resetValue(environment));
});
}
}
ConfigurationPropertiesBeans
ConfigurationPropertiesBeans实现了BeanPostPorocessor接口,该接口我们也叫后置处理器,作用是在Bean对象在实例化和依赖注入完毕后,在显示调用初始化方法的前后添加我们自己的逻辑。注意是Bean实例化完毕后及依赖注入完成后触发的。
我们可以在这个后置处理器的回调方法中,扫描指定注解的bean,收集这些属性,用来触发事件变更。
@Component
public class ConfigurationPropertiesBeans implements BeanPostProcessor {
private Map<String,List<FieldPair>> fieldMapper=new HashMap<>();
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
Class clz=bean.getClass();
if(clz.isAnnotationPresent(RefreshScope.class)){ //如果某个bean声明了RefreshScope注解,说明需要进行动态更新
for(Field field:clz.getDeclaredFields()){
Value value=field.getAnnotation(Value.class);
List<String> keyList=getPropertyKey(value.value(),0);
for(String key:keyList){
//使用List<FieldPair>存储的目的是,如果在多个bean中存在相同的key,则全部进行替换
fieldMapper.computeIfAbsent(key,(k)->new ArrayList()).add(new FieldPair(bean,field,value.value()));
}
}
}
return bean;
}
//获取key信息,也就是${value}中解析出value这个属性
private List<String> getPropertyKey(String value,int begin){
int start=value.indexOf("${",begin)+2;
if(start<2){
return new ArrayList<>();
}
int middle=value.indexOf(":",start);
int end=value.indexOf("}",start);
String key;
if(middle>0&&middle<end){
key=value.substring(start,middle);
}else{
key=value.substring(start,end);
}
//如果是这种用法,就需要递归,@Value("${swagger2.host:127.0.0.1:${server.port:8080}}")
List<String> keys=getPropertyKey(value,end);
keys.add(key);
return keys;
}
public Map<String, List<FieldPair>> getFieldMapper() {
return fieldMapper;
}
}
RefreshScope
定义注解来实现指定需要动态刷新类的识别。
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RefreshScope {
}
FieldPair
这个类中主要通过PropertyPlaceholderHelper将字符串里的占位符内容,用我们配置的properties里的替换。
public class FieldPair {
private static PropertyPlaceholderHelper propertyPlaceholderHelper=new PropertyPlaceholderHelper("${","}",":",true);
private Object bean;
private Field field;
private String value;
public FieldPair(Object bean, Field field, String value) {
this.bean = bean;
this.field = field;
this.value = value;
}
public void resetValue(Environment environment){
boolean access=field.isAccessible();
if(!access){
field.setAccessible(true);
}
//从新从environment中将占位符替换为新的值
String resetValue=propertyPlaceholderHelper.replacePlaceholders(value,((ConfigurableEnvironment) environment)::getProperty);
try {
//通过反射更新
field.set(bean,resetValue);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
访问测试ConfigController
@RefreshScope
@RestController
public class ConfigController {
@Value("${name}")
private String name;
@Value("${job}")
private String job;
@GetMapping
public String get(){
return name+":"+job;
}
}
基于自定义PropertySourceLocator扩展
由于在上述代码中,我们创建了一个PropertySourceLocator接口,并且在整个配置加载过程中,我们都是基于PropertySourceLocator扩展点来进行加载的,所以也就是意味着除了上述使用的Zookeeper作为远程配置装载以外,我们还可以通过扩展PropertySourceLocator来实现其他的扩展,具体实现如下
CustomPropertySourceLocator
创建一个MapPropertySource作为Environment的属性源。
public class CustomPropertySourceLocator implements PropertySourceLocator{
@Override
public PropertySource<?> locate(Environment environment, ConfigurableApplicationContext applicationContext) {
Map<String, Object> source = new HashMap<>();
source.put("age","18");
MapPropertySource propertiesPropertySource = new MapPropertySource("configCenter",source);
return propertiesPropertySource;
}
}
spring.factories
由于CustomPropertySourceLocator是自定义扩展点,所以我们需要在spring.factories文件中定义它的扩展实现,修改如下
com.gupaoedu.example.zookeepercuratordemo.config.PropertySourceLocator=
com.gupaoedu.example.zookeepercuratordemo.config.ZookeeperPropertySourceLocator,
com.gupaoedu.example.zookeepercuratordemo.config.CustomPropertySourceLocator
ConfigController
接下来,我们通过下面的代码进行测试,从结果可以看到,我们自己定义的propertySource被加载到Environment中了。
@RefreshScope
@RestController
public class ConfigController {
@Value("${name}")
private String name;
@Value("${job}")
private String job;
@Value("${age}")
private String age;
@GetMapping
public String get(){
return name+":"+job+":"+age;
}
}
关注[跟着Mic学架构]公众号,获取更多精品原创