博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
nacos-discovery源码分析
阅读量:4043 次
发布时间:2019-05-24

本文共 11676 字,大约阅读时间需要 38 分钟。

首先还是Spring.factories中的AutoConfiguration类

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\  org.springframework.cloud.alibaba.nacos.NacosDiscoveryAutoConfiguration,\  org.springframework.cloud.alibaba.nacos.ribbon.RibbonNacosAutoConfiguration,\  org.springframework.cloud.alibaba.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\  org.springframework.cloud.alibaba.nacos.discovery.NacosDiscoveryClientAutoConfiguration

注册类NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration>,追踪代码发现调用start(),再次后register()

@Override	@SuppressWarnings("deprecation")	public void onApplicationEvent(WebServerInitializedEvent event) {		bind(event);	}	@Deprecated	public void bind(WebServerInitializedEvent event) {		ApplicationContext context = event.getApplicationContext();		if (context instanceof ConfigurableWebServerApplicationContext) {			if ("management".equals(					((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {				return;			}		}		this.port.compareAndSet(0, event.getWebServer().getPort());		this.start();	}

之后会调用client中NacosNamingService的registerInstance(),beatReactor加入心跳,serverProxy注册到服务器,请求路径是

/nacos/v1/ns/instance 的post方法

根据路径找到服务器naming模块中的InstanceController

之后是交给了serviceManager进行注册

createEmptyService方法进行了service的创建,service.init()进行了HealthCheckReactor健康检查,同时putService(service),放入serviceMap
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {        Service service = getService(namespaceId, serviceName);        if (service == null) {            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);            service = new Service();            service.setName(serviceName);            service.setNamespaceId(namespaceId);            service.setGroupName(Constants.DEFAULT_GROUP);            // now validate the service. if failed, exception will be thrown            service.setLastModifiedMillis(System.currentTimeMillis());            service.recalculateChecksum();            service.validate();            if (local) {                putService(service);                service.init();                consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);                consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);            } else {                addOrReplaceService(service);            }        }    }
addInstance()方法继续追踪到DelegateConsistencyServiceImpl适配器的put方法,最后是implements  EphemeralConsistencyService的DistroConsistencyServiceImpl的put方法

在onput方法中,Notifier加入Task,继续看Notifier中,回调了RecordListener的具体对应实现类的onChange方法

看谁实现了RecordListener类,ServiceManager,Service等都实现了。

在注册或者修改时,都是触发回调了service类的onchange方法

@Override    public void onChange(String key, Instances value) throws Exception {        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);        for (Instance instance : value.getInstanceList()) {            if (instance == null) {                // Reject this abnormal instance list:                throw new RuntimeException("got null instance " + key);            }            if (instance.getWeight() > 10000.0D) {                instance.setWeight(10000.0D);            }            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {                instance.setWeight(0.01D);            }        }        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));        recalculateChecksum();    }

updateIPs,进行instance,Cluster的更新,PushService.serviceChanged进行信息同步,udpSocket的方式

public void updateIPs(Collection
instances, boolean ephemeral) { Map
> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJSON()); Cluster cluster = new Cluster(instance.getClusterName()); cluster.setService(this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List
clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry
> entry : ipMap.entrySet()) { //make every ip mine List
entryIPs = entry.getValue(); clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(namespaceId, getName()); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); }

注册,就先到这里。再看心跳检测,主要是NacosWatch中

NamingService namingService = properties.namingServiceInstance();			ListView
listView = properties.namingServiceInstance() .getServicesOfServer(1, Integer.MAX_VALUE); List
serviceList = listView.getData(); // if there are new services found, publish event Set
currentServices = new HashSet<>(serviceList); currentServices.removeAll(cacheServices); if (currentServices.size() > 0) { changed = true; } // if some services disappear, publish event if (cacheServices.removeAll(new HashSet<>(serviceList)) && cacheServices.size() > 0) { changed = true; for (String serviceName : cacheServices) { namingService.unsubscribe(serviceName, subscribeListeners.get(serviceName)); subscribeListeners.remove(serviceName); } } cacheServices = new HashSet<>(serviceList); // subscribe services's node change, publish event if nodes changed for (String serviceName : cacheServices) { if (!subscribeListeners.containsKey(serviceName)) { EventListener eventListener = event -> NacosWatch.this.publisher .publishEvent(new HeartbeatEvent(NacosWatch.this, nacosWatchIndex.getAndIncrement())); subscribeListeners.put(serviceName, eventListener); namingService.subscribe(serviceName, eventListener); } } if (changed) { this.publisher.publishEvent( new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement())); }

获取服务端的serviceName结合,与本地进行比对,更新本地,有变化时进行相关的订阅或者取消订阅,然后发布event。获取服务器追踪后代码为获取namespace的service

List
serviceNameList = serviceManager.getAllServiceNameList(namespaceId); JSONObject result = new JSONObject(); if (serviceNameList == null || serviceNameList.isEmpty()) { result.put("doms", new ArrayList
(1)); result.put("count", 0); return result; } Iterator
iterator = serviceNameList.iterator(); while (iterator.hasNext()) { String serviceName = iterator.next(); if (!serviceName.startsWith(groupName + Constants.SERVICE_INFO_SPLITER)) { iterator.remove(); } }

之后看服务发现,服务发现最后主要是获取ServerList,nacos中是NacosServerList类,查询所有健康的instance

private List
getServers() { try { List
instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, true); return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } } private List
instancesToServerList(List
instances) { List
result = new ArrayList<>(); if (null == instances) { return result; } for (Instance instance : instances) { result.add(new NacosServer(instance)); } return result; }

追踪selectInstances,到HostReactor中

如果serviceInfoMap中获取不到,则维护一个serviceInfo到serviceInfoMap  updatingMap

如果serviceInfoMap,updatingMap均获取到了,则wait 5秒

最后是scheduleUpdateIfAbsent,维护到futureMap,同时添加一个UpdateTask到定时任务中了

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {            return;        }        synchronized (futureMap) {            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {                return;            }            ScheduledFuture
future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } }

UpdateTask的 run

public void run() {            try {                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));                if (serviceObj == null) {                    updateServiceNow(serviceName, clusters);                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);                    return;                }                if (serviceObj.getLastRefTime() <= lastRefTime) {                    updateServiceNow(serviceName, clusters);                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));                } else {                    // if serviceName already updated by push, we should not override it                    // since the push data may be different from pull through force push                    refreshOnly(serviceName, clusters);                }                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);                lastRefTime = serviceObj.getLastRefTime();            } catch (Throwable e) {                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);            }        }

如果serviceInfo不存在,或者不是最新的,则通过updateServiceNow去获取

public void updateServiceNow(String serviceName, String clusters) {        ServiceInfo oldService = getSerivceInfo0(serviceName, clusters);        try {            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);            if (StringUtils.isNotEmpty(result)) {                processServiceJSON(result);            }        } catch (Exception e) {            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);        } finally {            if (oldService != null) {                synchronized (oldService) {                    oldService.notifyAll();                }            }        }    }

之前上面serviceInfoMap在getServiceInfo时如果不存在则添加,同时也通过定时任务进行获取,更新serviceInfoMap值。

 

后续更多nacos服务端的设计以及实现原理,后续再补充。

转载地址:http://pgadi.baihongyu.com/

你可能感兴趣的文章
nginx+tomcat+memcached (msm)实现 session同步复制
查看>>
c++字符数组和字符指针区别以及str***函数
查看>>
c++类的操作符重载注意事项
查看>>
c++模板与泛型编程
查看>>
WAV文件解析
查看>>
WPF中PATH使用AI导出SVG的方法
查看>>
WPF UI&控件免费开源库
查看>>
QT打开项目提示no valid settings file could be found
查看>>
Win10+VS+ESP32环境搭建
查看>>
Ubuntu+win10远程桌面
查看>>
flutter-实现圆角带边框的view(android无效)
查看>>
android 代码实现圆角
查看>>
flutter-解析json
查看>>
android中shader的使用
查看>>
java LinkedList与ArrayList迭代器遍历和for遍历对比
查看>>
drat中构造方法
查看>>
JavaScript的一些基础-数据类型
查看>>
JavaScript基础知识(2)
查看>>
转载一个webview开车指南以及实际项目中的使用
查看>>
android中对于非属性动画的整理
查看>>