Nacos订阅概述
Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)。
以下是订阅方法的主线流程,涉及内容比较多,细节比较复杂,所以这里我们主要学习核心部分。
定时任务开启
其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。
NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:
1 2 3 4 5 6 7 8 9 10
| @Override public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { if (null == listener) { return; } String clusterString = StringUtils.join(clusters, ","); changeNotifier.registerListener(groupName, serviceName, clusterString, listener); clientProxy.subscribe(serviceName, groupName, clusterString); }
|
这里我们先来看subscribe方法,大家可能有些眼熟它是clientProxy类型调用的方法,实际上就是NamingClientProxyDelegate.subscribe(),所以其实这里和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (null == result) { result = grpcClientProxy.subscribe(serviceName, groupName, clusters); } serviceInfoHolder.processServiceInfo(result); return result; }
|
但是这里我们要关注这里的任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) { String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); if (futureMap.get(serviceKey) != null) { return; } synchronized (futureMap) { if (futureMap.get(serviceKey) != null) { return; } ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters)); futureMap.put(serviceKey, future); } }
|
定时任务延迟一秒执行:
1 2 3
| private synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); }
|
所以在这里我们得出结论,核心为:调用订阅方法和发起定时任务。
定时任务执行内容
UpdateTask封装了订阅机制的核心业务逻辑,我们来看一下流程图:
当我们知道了整体流程以后,我们再来看对应源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Override public void run() { long delayTime = DEFAULT_DELAY;
try { if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) { NAMING_LOGGER .info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters); return; } ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (serviceObj == null) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); serviceInfoHolder.processServiceInfo(serviceObj); lastRefTime = serviceObj.getLastRefTime(); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); serviceInfoHolder.processServiceInfo(serviceObj); } lastRefTime = serviceObj.getLastRefTime(); if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e); } finally { executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }
|
业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。
总结:
订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;
通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;
通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;
UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。
重新计算定时任务时间,循环执行流程。