Nacos源码之一-配置自动更新

Nacos源码之一-配置自动更新Nacos 是阿里巴巴开源的集分布式配置中心、分布式注册中心为一体的分布式解决方案。它的优点:提供命令空间,方便管理不同环境的配置;提供web界

大家好,欢迎来到IT知识分享网。

Nacos 是阿里巴巴开源的集分布式配置中心、分布式注册中心为一体的分布式解决方案。

它的优点:

  1. 提供命令空间,方便管理不同环境的配置;
  2. 提供web界面,方便管理配置和服务;
  3. 支持配置版本管理,回滚;
  4. 支持服务管理,手动上线、下线服务。

等等其他优点。

1 如何使用 Nacos 自动更新配置

1.1 配置自动更新的两种方式

第一种方式

  1. 属性使用@Value注解
  2. 类使用@RefreshScope 注解
@RefreshScope @RequestMapping("config") public class ConfigController {     @Value("${useLocalCache:false}")     private boolean useLocalCache; } 

第二种方式

  1. 使用@NacosValue注解,自动更新配置成true
@Controller @RequestMapping("config") public class ConfigController {     @NacosValue(value = "${useLocalCache:false}", autoRefreshed = true)     private boolean useLocalCache;     @RequestMapping(value = "/get", method = GET)     @ResponseBody     public boolean get() {         return useLocalCache;     } } 

2 Nacos 配置更新源码分析

先上图

Nacos源码之一-配置自动更新

Nacos配置更新流程图

原图链接:https://www.processon.com/view/link/60b0238e3cde5

具体步骤

1、第一步(更新数据库)比较简单,就是判断是新增配置还是修改配置,然后修改或者新增数据库响应信息(根据使用的数据库是内嵌derby还是mysql使用不同的实现)。

2、比较复杂的是通知(更新文件系统中的文件)

2.1 通过发布订阅模式,发布配置变更事件

2.2 订阅者接收到消息后,调用controller请求(communication/dataChange)

2.3 这个controller请求,启动一个异步任务,这个异步任务更新配置数据到指定的配置文件(nacos目录下)

之后通过发布-订阅模式通知其他服务,其他服务接收到通知之后,更新配置。

PS:

1、controller 请求http://ip:port/nacos/v1/cs/communication/dataChange?dataId=example&group=DEFAULT_GROUP

2、配置存储有两个地方,一个是在文件系统中,另一个是配置的数据库中(可能是内嵌的derby数据库和MySQL数据库)

3、一些配置基本信息,比如配置文件的 MD5 值、dataId、groupName等,会保存在 ConcurrentHashMap 存储的缓存的

源码1: 添加异步任务更新配置文件代码

/**  * Add DumpTask to TaskManager, it will execute asynchronously.  */ public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {     String groupKey = GroupKey2.getKey(dataId, group, tenant);     String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));     dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));     DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); } 

然后通过persistService读取数据库中的相应记录

源码2: 读取数据库中的最新配置数据

ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant); 

将读取到的内容,写入到本地文件nacos/distribution/target/nacos-server-1.4.2/nacos/data/config-data/${GROUP_NAME}/${dataId}中,并更新配置文件的md5值。

源码3: 保存配置文件到文件目录中,并更新文件的 MD5 值

/**  * Save config file and update md5 value in cache.  *  * @param dataId         dataId string value.  * @param group          group string value.  * @param tenant         tenant string value.  * @param content        content string value.  * @param lastModifiedTs lastModifiedTs.  * @param type           file type.  * @return dumpChange success or not.  */ public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,         String type) {     String groupKey = GroupKey2.getKey(dataId, group, tenant);     CacheItem ci = makeSure(groupKey);     ci.setType(type);     final int lockResult = tryWriteLock(groupKey);     assert (lockResult != 0);          if (lockResult < 0) {         DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);         return false;     }          try {         final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);                  if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {             DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "                             + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),                     lastModifiedTs);         } else if (!PropertyUtil.isDirectRead()) {          // 保存数据到文件中             DiskUtil.saveToDisk(dataId, group, tenant, content);         }         updateMd5(groupKey, md5, lastModifiedTs);         return true;     } catch (IOException ioe) {         DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);         if (ioe.getMessage() != null) {             String errMsg = ioe.getMessage();             if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg                     .contains(DISK_QUATA_EN)) {                 // Protect from disk full.                 FATAL_LOG.error("磁盘满自杀退出", ioe);                 System.exit(0);             }         }         return false;     } finally {         releaseWriteLock(groupKey);     } } /**  * Save configuration information to disk.  */ public static void saveToDisk(String dataId, String group, String tenant, String content) throws IOException {     File targetFile = targetFile(dataId, group, tenant);     FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE); } 

3 源码亮点

3.1 发布订阅模式

发布订阅模式使用于一个事件发生需要通知多个订阅者的场景。实际开发中,也能会有不同的名字,大家要能够识别,比如:Subject-Observer(经典例子)、Publisher-Subscriber(Nacos)、Producer-Consumer(Kafka)、Dispatcher-Listener。

发布变更事件

/**  * Request publisher publish event Publishers load lazily, calling publisher.  *  * @param eventType class Instances type of the event type.  * @param event     event instance.  */ private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {     if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {         return INSTANCE.sharePublisher.publish(event);     }          final String topic = ClassUtils.getCanonicalName(eventType);          EventPublisher publisher = INSTANCE.publisherMap.get(topic);     if (publisher != null) {         return publisher.publish(event);     }     LOGGER.warn("There are no [{}] publishers for this event, please register", topic);     return false; } 

循环订阅者列表,通知订阅者。

/**  * Receive and notifySubscriber to process the event.  *  * @param event {@link Event}.  */ void receiveEvent(Event event) {     final long currentEventSequence = event.sequence();          // Notification single event listener     for (Subscriber subscriber : subscribers) {         // Whether to ignore expiration events         if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {             LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",                     event.getClass());             continue;         }                  // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.         // Remove original judge part of codes.         notifySubscriber(subscriber, event);     } }      

通过线程池处理订阅者任务,因为这里订阅者需要调用HTTP请求来处理更新,所以通过线程池可以解耦

@Override public void notifySubscriber(final Subscriber subscriber, final Event event) {          LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);          final Runnable job = new Runnable() {         @Override         public void run() {             subscriber.onEvent(event);         }     };          final Executor executor = subscriber.executor();          if (executor != null) {         executor.execute(job);     } else {         try {             job.run();         } catch (Throwable e) {             LOGGER.error("Event callback exception : {}", e);         }     } } 

3.2 任务管理器

这里存在竞争资源—单个配置文件,也就是确定Group下的dataId文件或者数据库记录。

  1. 失败重试
/**  * process tasks in execute engine.  */ protected void processTasks() {     Collection<Object> keys = getAllTaskKeys();     for (Object taskKey : keys) {         AbstractDelayTask task = removeTask(taskKey);         if (null == task) {             continue;         }         NacosTaskProcessor processor = getProcessor(taskKey);         if (null == processor) {             getEngineLog().error("processor not found for task, so discarded. " + task);             continue;         }         try {             // ReAdd task if process failed             if (!processor.process(task)) {                 retryFailedTask(taskKey, task);             }         } catch (Throwable e) {             getEngineLog().error("Nacos task execute error : " + e.toString(), e);             retryFailedTask(taskKey, task);         }     } } 

3.3. 添加任务处理并发

使用重入锁ReentrantLock

protected final ReentrantLock lock = new ReentrantLock(); @Override public void addTask(Object key, AbstractDelayTask newTask) {     lock.lock();     try {         AbstractDelayTask existTask = tasks.get(key);         if (null != existTask) {             newTask.merge(existTask);         }         tasks.put(key, newTask);     } finally {         lock.unlock();     } } 
  1. 缓存任务处理器对象,需要的时候直接通过本地缓存获取
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>(); 

3.4 一个接口多个实现,根据条件选择

如果要实现多种不同数据库的操作的时候,Condition是非常有用的,比如这里要支持内嵌的derby数据库,也要支持MySQL,通过Condition可以根据需要使用不同的数据库实现。

传统企业里面有些业务系统需要支持多种不同的数据库,不同客户现场可能会使用不同的数据库,通过这种方式可以减少定制、减少到现场由于客户数据库不同而临时进行定制开发。

@Conditional(value = ConditionOnEmbeddedStorage.class) @Component public class EmbeddedStoragePersistServiceImpl implements PersistService { 
@Conditional(value = ConditionOnExternalStorage.class) @Component public class ExternalStoragePersistServiceImpl implements PersistService { 

如果本文对你有帮助,欢迎关注我的公众号 【哥妞】 ,带你深入 JAVA 的世界~

Nacos源码之一-配置自动更新

关注一波,通知更及时

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/76736.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信