大家好,欢迎来到IT知识分享网。
Nacos 是阿里巴巴开源的集分布式配置中心、分布式注册中心为一体的分布式解决方案。
它的优点:
- 提供命令空间,方便管理不同环境的配置;
- 提供web界面,方便管理配置和服务;
- 支持配置版本管理,回滚;
- 支持服务管理,手动上线、下线服务。
等等其他优点。
1 如何使用 Nacos 自动更新配置
1.1 配置自动更新的两种方式
第一种方式
- 属性使用@Value注解
- 类使用@RefreshScope 注解
@RefreshScope @RequestMapping("config") public class ConfigController { @Value("${useLocalCache:false}") private boolean useLocalCache; }
第二种方式
- 使用@NacosValue注解,自动更新配置成true
@Controller @RequestMapping("config") public class ConfigController { @NacosValue(value = "${useLocalCache:false}", autoRefreshed = true) private boolean useLocalCache; @RequestMapping(value = "/get", method = GET) @ResponseBody public boolean get() { return useLocalCache; } }
2 Nacos 配置更新源码分析
先上图
原图链接:https://www.processon.com/view/link/60b0238e3cde5
具体步骤
1、第一步(更新数据库)比较简单,就是判断是新增配置还是修改配置,然后修改或者新增数据库响应信息(根据使用的数据库是内嵌derby还是mysql使用不同的实现)。
2、比较复杂的是通知(更新文件系统中的文件)
2.1 通过发布订阅模式,发布配置变更事件
2.2 订阅者接收到消息后,调用controller请求(communication/dataChange)
2.3 这个controller请求,启动一个异步任务,这个异步任务更新配置数据到指定的配置文件(nacos目录下)
之后通过发布-订阅模式通知其他服务,其他服务接收到通知之后,更新配置。
PS:
1、controller 请求http://ip:port/nacos/v1/cs/communication/dataChange?dataId=example&group=DEFAULT_GROUP
2、配置存储有两个地方,一个是在文件系统中,另一个是配置的数据库中(可能是内嵌的derby数据库和MySQL数据库)
3、一些配置基本信息,比如配置文件的 MD5 值、dataId、groupName等,会保存在 ConcurrentHashMap 存储的缓存的
源码1: 添加异步任务更新配置文件代码
/** * Add DumpTask to TaskManager, it will execute asynchronously. */ public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta)); dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); }
然后通过persistService读取数据库中的相应记录
源码2: 读取数据库中的最新配置数据
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
将读取到的内容,写入到本地文件nacos/distribution/target/nacos-server-1.4.2/nacos/data/config-data/${GROUP_NAME}/${dataId}中,并更新配置文件的md5值。
源码3: 保存配置文件到文件目录中,并更新文件的 MD5 值
/** * Save config file and update md5 value in cache. * * @param dataId dataId string value. * @param group group string value. * @param tenant tenant string value. * @param content content string value. * @param lastModifiedTs lastModifiedTs. * @param type file type. * @return dumpChange success or not. */ public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs, String type) { String groupKey = GroupKey2.getKey(dataId, group, tenant); CacheItem ci = makeSure(groupKey); ci.setType(type); final int lockResult = tryWriteLock(groupKey); assert (lockResult != 0); if (lockResult < 0) { DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey); return false; } try { final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) { DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, " + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey), lastModifiedTs); } else if (!PropertyUtil.isDirectRead()) { // 保存数据到文件中 DiskUtil.saveToDisk(dataId, group, tenant, content); } updateMd5(groupKey, md5, lastModifiedTs); return true; } catch (IOException ioe) { DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe); if (ioe.getMessage() != null) { String errMsg = ioe.getMessage(); if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg .contains(DISK_QUATA_EN)) { // Protect from disk full. FATAL_LOG.error("磁盘满自杀退出", ioe); System.exit(0); } } return false; } finally { releaseWriteLock(groupKey); } } /** * Save configuration information to disk. */ public static void saveToDisk(String dataId, String group, String tenant, String content) throws IOException { File targetFile = targetFile(dataId, group, tenant); FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE); }
3 源码亮点
3.1 发布订阅模式
发布订阅模式使用于一个事件发生需要通知多个订阅者的场景。实际开发中,也能会有不同的名字,大家要能够识别,比如:Subject-Observer(经典例子)、Publisher-Subscriber(Nacos)、Producer-Consumer(Kafka)、Dispatcher-Listener。
发布变更事件
/** * Request publisher publish event Publishers load lazily, calling publisher. * * @param eventType class Instances type of the event type. * @param event event instance. */ private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher.publish(event); } final String topic = ClassUtils.getCanonicalName(eventType); EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher != null) { return publisher.publish(event); } LOGGER.warn("There are no [{}] publishers for this event, please register", topic); return false; }
循环订阅者列表,通知订阅者。
/** * Receive and notifySubscriber to process the event. * * @param event {@link Event}. */ void receiveEvent(Event event) { final long currentEventSequence = event.sequence(); // Notification single event listener for (Subscriber subscriber : subscribers) { // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass()); continue; } // Because unifying smartSubscriber and subscriber, so here need to think of compatibility. // Remove original judge part of codes. notifySubscriber(subscriber, event); } }
通过线程池处理订阅者任务,因为这里订阅者需要调用HTTP请求来处理更新,所以通过线程池可以解耦。
@Override public void notifySubscriber(final Subscriber subscriber, final Event event) { LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber); final Runnable job = new Runnable() { @Override public void run() { subscriber.onEvent(event); } }; final Executor executor = subscriber.executor(); if (executor != null) { executor.execute(job); } else { try { job.run(); } catch (Throwable e) { LOGGER.error("Event callback exception : {}", e); } } }
3.2 任务管理器
这里存在竞争资源—单个配置文件,也就是确定Group下的dataId文件或者数据库记录。
- 失败重试
/** * process tasks in execute engine. */ protected void processTasks() { Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } }
3.3. 添加任务处理并发
使用重入锁ReentrantLock
protected final ReentrantLock lock = new ReentrantLock(); @Override public void addTask(Object key, AbstractDelayTask newTask) { lock.lock(); try { AbstractDelayTask existTask = tasks.get(key); if (null != existTask) { newTask.merge(existTask); } tasks.put(key, newTask); } finally { lock.unlock(); } }
- 缓存任务处理器对象,需要的时候直接通过本地缓存获取
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();
3.4 一个接口多个实现,根据条件选择
如果要实现多种不同数据库的操作的时候,Condition是非常有用的,比如这里要支持内嵌的derby数据库,也要支持MySQL,通过Condition可以根据需要使用不同的数据库实现。
传统企业里面有些业务系统需要支持多种不同的数据库,不同客户现场可能会使用不同的数据库,通过这种方式可以减少定制、减少到现场由于客户数据库不同而临时进行定制开发。
@Conditional(value = ConditionOnEmbeddedStorage.class) @Component public class EmbeddedStoragePersistServiceImpl implements PersistService {
@Conditional(value = ConditionOnExternalStorage.class) @Component public class ExternalStoragePersistServiceImpl implements PersistService {
如果本文对你有帮助,欢迎关注我的公众号 【哥妞】 ,带你深入 JAVA 的世界~
关注一波,通知更及时
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/76736.html