本文共 11676 字,大约阅读时间需要 38 分钟。
首先还是Spring.factories中的AutoConfiguration类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.alibaba.nacos.NacosDiscoveryAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.ribbon.RibbonNacosAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.discovery.NacosDiscoveryClientAutoConfiguration
注册类NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration>,追踪代码发现调用start(),再次后register()
@Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals( ((ConfigurableWebServerApplicationContext) context).getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); }
之后会调用client中NacosNamingService的registerInstance(),beatReactor加入心跳,serverProxy注册到服务器,请求路径是
/nacos/v1/ns/instance 的post方法
根据路径找到服务器naming模块中的InstanceController
之后是交给了serviceManager进行注册
createEmptyService方法进行了service的创建,service.init()进行了HealthCheckReactor健康检查,同时putService(service),放入serviceMap
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); service.validate(); if (local) { putService(service); service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); } else { addOrReplaceService(service); } } }
addInstance()方法继续追踪到DelegateConsistencyServiceImpl适配器的put方法,最后是implements EphemeralConsistencyService的DistroConsistencyServiceImpl的put方法
在onput方法中,Notifier加入Task,继续看Notifier中,回调了RecordListener的具体对应实现类的onChange方法
看谁实现了RecordListener类,ServiceManager,Service等都实现了。
在注册或者修改时,都是触发回调了service类的onchange方法
@Override public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
updateIPs,进行instance,Cluster的更新,PushService.serviceChanged进行信息同步,udpSocket的方式
public void updateIPs(Collectioninstances, boolean ephemeral) { Map > ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJSON()); Cluster cluster = new Cluster(instance.getClusterName()); cluster.setService(this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry > entry : ipMap.entrySet()) { //make every ip mine List entryIPs = entry.getValue(); clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(namespaceId, getName()); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); }
注册,就先到这里。再看心跳检测,主要是NacosWatch中
NamingService namingService = properties.namingServiceInstance(); ListViewlistView = properties.namingServiceInstance() .getServicesOfServer(1, Integer.MAX_VALUE); List serviceList = listView.getData(); // if there are new services found, publish event Set currentServices = new HashSet<>(serviceList); currentServices.removeAll(cacheServices); if (currentServices.size() > 0) { changed = true; } // if some services disappear, publish event if (cacheServices.removeAll(new HashSet<>(serviceList)) && cacheServices.size() > 0) { changed = true; for (String serviceName : cacheServices) { namingService.unsubscribe(serviceName, subscribeListeners.get(serviceName)); subscribeListeners.remove(serviceName); } } cacheServices = new HashSet<>(serviceList); // subscribe services's node change, publish event if nodes changed for (String serviceName : cacheServices) { if (!subscribeListeners.containsKey(serviceName)) { EventListener eventListener = event -> NacosWatch.this.publisher .publishEvent(new HeartbeatEvent(NacosWatch.this, nacosWatchIndex.getAndIncrement())); subscribeListeners.put(serviceName, eventListener); namingService.subscribe(serviceName, eventListener); } } if (changed) { this.publisher.publishEvent( new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement())); }
获取服务端的serviceName结合,与本地进行比对,更新本地,有变化时进行相关的订阅或者取消订阅,然后发布event。获取服务器追踪后代码为获取namespace的service
ListserviceNameList = serviceManager.getAllServiceNameList(namespaceId); JSONObject result = new JSONObject(); if (serviceNameList == null || serviceNameList.isEmpty()) { result.put("doms", new ArrayList (1)); result.put("count", 0); return result; } Iterator iterator = serviceNameList.iterator(); while (iterator.hasNext()) { String serviceName = iterator.next(); if (!serviceName.startsWith(groupName + Constants.SERVICE_INFO_SPLITER)) { iterator.remove(); } }
之后看服务发现,服务发现最后主要是获取ServerList,nacos中是NacosServerList类,查询所有健康的instance
private ListgetServers() { try { List instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, true); return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } } private List instancesToServerList(List instances) { List result = new ArrayList<>(); if (null == instances) { return result; } for (Instance instance : instances) { result.add(new NacosServer(instance)); } return result; }
追踪selectInstances,到HostReactor中
如果serviceInfoMap中获取不到,则维护一个serviceInfo到serviceInfoMap updatingMap
如果serviceInfoMap,updatingMap均获取到了,则wait 5秒
最后是scheduleUpdateIfAbsent,维护到futureMap,同时添加一个UpdateTask到定时任务中了
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } }
UpdateTask的 run
public void run() { try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateServiceNow(serviceName, clusters); executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateServiceNow(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS); lastRefTime = serviceObj.getLastRefTime(); } catch (Throwable e) { NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } }
如果serviceInfo不存在,或者不是最新的,则通过updateServiceNow去获取
public void updateServiceNow(String serviceName, String clusters) { ServiceInfo oldService = getSerivceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJSON(result); } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } }
之前上面serviceInfoMap在getServiceInfo时如果不存在则添加,同时也通过定时任务进行获取,更新serviceInfoMap值。
后续更多nacos服务端的设计以及实现原理,后续再补充。
转载地址:http://pgadi.baihongyu.com/