注册发现流程
客户端
注册自身服务
nacos客户端监听了启动事件,调用start()方法,封装注册信息成instance对象(ip,port,weight,group,namspace...),用nacosRestTemplate发送rest请求到服务端
spring.factories
监听事件
start()方法
将instance放到map中去
发送rest请求
服务端接口地址
与官网手册的注册接口一致
服务端
查找接口/nacos/v1/ns/instance 的post方法
创建service 时,里边有一个双层map,就是隔离命名空间,group的数据结构
- /**
- * Map(namespace, Map(group::serviceName, Service)).
- */
- private final Map
> serviceMap = new ConcurrentHashMap<>();
- /*
- * Copyright 1999-2018 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package com.alibaba.nacos.naming.consistency.ephemeral.distro;
-
- import com.alibaba.nacos.api.common.Constants;
- import com.alibaba.nacos.api.exception.NacosException;
- import com.alibaba.nacos.consistency.DataOperation;
- import com.alibaba.nacos.core.distributed.distro.DistroConfig;
- import com.alibaba.nacos.core.distributed.distro.DistroProtocol;
- import com.alibaba.nacos.core.distributed.distro.component.DistroDataProcessor;
- import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
- import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
- import com.alibaba.nacos.naming.cluster.ServerStatus;
- import com.alibaba.nacos.naming.cluster.transport.Serializer;
- import com.alibaba.nacos.naming.consistency.Datum;
- import com.alibaba.nacos.naming.consistency.KeyBuilder;
- import com.alibaba.nacos.naming.consistency.RecordListener;
- import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
- import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpCombinedKey;
- import com.alibaba.nacos.naming.core.DistroMapper;
- import com.alibaba.nacos.naming.core.Instances;
- import com.alibaba.nacos.naming.core.Service;
- import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
- import com.alibaba.nacos.naming.misc.GlobalConfig;
- import com.alibaba.nacos.naming.misc.GlobalExecutor;
- import com.alibaba.nacos.naming.misc.Loggers;
- import com.alibaba.nacos.naming.misc.SwitchDomain;
- import com.alibaba.nacos.naming.pojo.Record;
- import com.alibaba.nacos.sys.utils.ApplicationUtils;
- import com.alibaba.nacos.common.utils.StringUtils;
- import org.javatuples.Pair;
- import org.springframework.context.annotation.DependsOn;
-
- import javax.annotation.PostConstruct;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.Optional;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentLinkedQueue;
-
-
- @DependsOn("ProtocolManager")
- @org.springframework.stereotype.Service("distroConsistencyService")
- public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
-
- private static final String ON_RECEIVE_CHECKSUMS_PROCESSING_TAG = "1";
-
- private final DistroMapper distroMapper;
-
- private final DataStore dataStore;
-
- private final Serializer serializer;
-
- private final SwitchDomain switchDomain;
-
- private final GlobalConfig globalConfig;
-
- private final DistroProtocol distroProtocol;
-
- private volatile Notifier notifier = new Notifier();
-
- private Map
> listeners = new ConcurrentHashMap<>(); -
- private Map
syncChecksumTasks = new ConcurrentHashMap<>(16); -
- public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataStore, Serializer serializer,
- SwitchDomain switchDomain, GlobalConfig globalConfig, DistroProtocol distroProtocol) {
- this.distroMapper = distroMapper;
- this.dataStore = dataStore;
- this.serializer = serializer;
- this.switchDomain = switchDomain;
- this.globalConfig = globalConfig;
- this.distroProtocol = distroProtocol;
- }
-
- //初始化加载
- @PostConstruct
- public void init() {
- GlobalExecutor.submitDistroNotifyTask(notifier);
- }
-
- @Override
- public void put(String key, Record value) throws NacosException {
- //单机注册
- onPut(key, value);
- // If upgrade to 2.0.X, do not sync for v1.
- if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
- return;
- }
- distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
- DistroConfig.getInstance().getSyncDelayMillis());
- }
-
- @Override
- public void remove(String key) throws NacosException {
- onRemove(key);
- listeners.remove(key);
- }
-
- @Override
- public Datum get(String key) throws NacosException {
- return dataStore.get(key);
- }
-
- /**
- * Put a new record.
- *
- * @param key key of record
- * @param value record
- */
- public void onPut(String key, Record value) {
-
- if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
- Datum
datum = new Datum<>(); - datum.value = (Instances) value;
- datum.key = key;
- datum.timestamp.incrementAndGet();
- dataStore.put(key, datum);
- }
-
- if (!listeners.containsKey(key)) {
- return;
- }
-
- //丢进阻塞队列中去
- notifier.addTask(key, DataOperation.CHANGE);
- }
-
-
-
-
- public boolean isInitialized() {
- return distroProtocol.isInitialized() || !globalConfig.isDataWarmup();
- }
-
- public class Notifier implements Runnable {
-
- private ConcurrentHashMap
services = new ConcurrentHashMap<>(10 * 1024); -
- private BlockingQueue
> tasks = new ArrayBlockingQueue<>(1024 * 1024); -
- /**
- * Add new notify task to queue.
- *
- * @param datumKey data key
- * @param action action for data
- */
- public void addTask(String datumKey, DataOperation action) {
-
- if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
- return;
- }
- if (action == DataOperation.CHANGE) {
- services.put(datumKey, StringUtils.EMPTY);
- }
- tasks.offer(Pair.with(datumKey, action));
- }
-
- public int getTaskSize() {
- return tasks.size();
- }
-
- @Override
- public void run() {
- Loggers.DISTRO.info("distro notifier started");
-
- for (; ; ) {
- try {
- Pair
pair = tasks.take(); - // 注册流程
- handle(pair);
- } catch (Throwable e) {
- Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
- }
- }
- }
-
- //注册核心流程
- private void handle(Pair
pair) { - try {
- String datumKey = pair.getValue0();
- DataOperation action = pair.getValue1();
-
- services.remove(datumKey);
-
- int count = 0;
-
- if (!listeners.containsKey(datumKey)) {
- return;
- }
-
- for (RecordListener listener : listeners.get(datumKey)) {
-
- count++;
-
- try {
- if (action == DataOperation.CHANGE) {
- //注册核心流程
- listener.onChange(datumKey, dataStore.get(datumKey).value);
- continue;
- }
-
- if (action == DataOperation.DELETE) {
- listener.onDelete(datumKey);
- continue;
- }
- } catch (Throwable e) {
- Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
- }
- }
-
- if (Loggers.DISTRO.isDebugEnabled()) {
- Loggers.DISTRO
- .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
- datumKey, count, action.name());
- }
- } catch (Throwable e) {
- Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
- }
- }
- }
- }
listener.onChange(datumKey, dataStore.get(datumKey).value);方法里边调用到写时复制方法public void updateIps(List
- //写时复制代码
- public void updateIps(List
ips, boolean ephemeral) { -
- //是不是临时节点,是复制ephemeralInstances一份出来,ephemeralInstances是nacos所有临时节点
- Set
toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; -
- //写入代码...
-
- //复制代码
- if (ephemeral) {
- ephemeralInstances = toUpdateInstances;
- } else {
- persistentInstances = toUpdateInstances;
- }
- }