目录

继续 从Eureka Client发起注册请求到Eureka Server处理的整个服务注册过程(上) 分析


目录:


一、Spring Cloud Eureka Server自动配置及初始化

@EnableEurekaServer

创建Spring Cloud Eureka Server首先要使用@EnableEurekaServer注解,其实质是:

@EnableDiscoveryClient
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}
  • @EnableDiscoveryClient: 引入服务发现客户端相关配置(身为Server的同时,在Server集群复制时也会作为Client)
  • 导入EurekaServerMarkerConfiguration: 激活EurekaServerAutoConfiguration

所以,@EnableEurekaServer注解和上一篇分析的Client启动注解都是通过向Spring容器注入Maker的形式激活xxAutoConfiguration配置类,Eureka Client是EurekaClientAutoConfiguration,Eureka Server是EurekaServerAutoConfiguration


EurekaServerAutoConfiguration - 注册服务自动配置类

以下是对自动注入的各个组件的简单分析:

  • 头部注解

    • @Import(EurekaServerInitializerConfiguration.class):导入Eureka Server初始化的配置类,其实现SmartLifecycle接口,会在Spring容器基本refresh完毕时调用EurekaServerBootstrap#contextInitialized() Eureka Server启动分析重点

    • @EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })

      • EurekaDashboardProperties 是仪表盘相关属性

      • InstanceRegistryProperties 是实例注册相关属性

        @ConfigurationProperties(PREFIX)
        public class InstanceRegistryProperties {
        
          public static final String PREFIX = "eureka.instance.registry";
        
        
          /* Default number of expected renews per minute, defaults to 1.
           * Setting expectedNumberOfRenewsPerMin to non-zero to ensure that even an isolated
           * server can adjust its eviction policy to the number of registrations (when it's
           * zero, even a successful registration won't reset the rate threshold in
           * InstanceRegistry.register()).
           * 每分钟默认续约数量为1
           * 将expectedNumberOfRenewsPerMin设置为非零
           * 以确保即使是隔离的服务器也可以根据注册数量调整其驱逐策略
           * (当它为零时,即使成功注册也不会重置InstanceRegistry.register()中的速率阈值)
           */
          @Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") // for backwards compatibility
                                                                      // 为了向后兼容
          private int expectedNumberOfRenewsPerMin = 1;
        
          /**
           * Value used in determining when leases are cancelled, default to 1 for standalone.
           * Should be set to 0 for peer replicated eurekas
           * 决定租约何时取消的值
           * 单机默认值为1,对于同行复制的eurekas,应设置为0
           */
          @Value("${eureka.server.defaultOpenForTrafficCount:1}") // for backwards compatibility
          private int defaultOpenForTrafficCount = 1;
    • @PropertySource("classpath:/eureka/server.properties") :在spring-cloud-netflix-eureka-server-xxx.jar中,只包含 spring.http.encoding.force=false

  • EurekaServerFeature: 访问/features端点时会显示启用的Eureka Server自动配置类为EurekaServerAutoConfiguration

  • EurekaServerConfig: 注入Eureka Server配置类,EurekaServerConfig是netflix的接口,里面有很多记录eureka服务器运行所需的配置信息,netflix的默认实现类是DefaultEurekaServerConfig,spring cloud的默认实现类是EurekaServerConfigBean

    @Configuration
    protected static class EurekaServerConfigBeanConfiguration {
      @Bean
      @ConditionalOnMissingBean
      public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
          EurekaServerConfigBean server = new EurekaServerConfigBean();  //创建EurekaServerConfigBean
            // 如果当前Eureka Server本身也需要作为客户端注册(集群模式必须开启??)
          if (clientConfig.shouldRegisterWithEureka()) {
              // Set a sensible default if we are supposed to replicate
                // 设置EurekaServer在启动期间eureka节点尝试从对等放获取注册表信息的重试次数
              server.setRegistrySyncRetries(5);
          }
          return server;
      }
    }
  • EurekaController:Eureka Server Dashborad 对应的 Controller(默认path: /)

  • PeerAwareInstanceRegistry: 直译是对等体可见的应用实例注册器,就是在注册实例时会考虑集群情况下其它Node相关操作的注册器

    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
          ServerCodecs serverCodecs) {
      this.eurekaClient.getApplications(); // force initialization
                                             // 强制初始化eurekaClient,在之前看RefreshScope的bug时,也使用到了这种方式强制创建eurekaClient
    
        // 创建InstanceRegistry(是spring cloud的实现)
        // 继承了PeerAwareInstanceRegistryImpl,PeerAwareInstanceRegistry接口的实现类
      return new InstanceRegistry(
                this.eurekaServerConfig, this.eurekaClientConfig,
              serverCodecs, this.eurekaClient,
              this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
              this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }
  • PeerEurekaNodes: 用来管理PeerEurekaNode的帮助类

    • start()会创建一个newSingleThreadScheduledExecutor定时单例线程池,定时更新PeerNode列表,线程名为“Eureka-PeerNodesUpdater”,线程执行间隔为 EurekaServerConfigBean.peerEurekaNodesUpdateIntervalMs=10 * MINUTES,调用时机是:DefaultEurekaServerContext在@PostConstruct调用initialize()-->peerEurekaNodes.start()
    • updatePeerEurekaNodes(resolvePeerUrls()): 定时线程里更新PeerNode列表的核心逻辑
      • resolvePeerUrls() : 是解析其它Eureka Server Node节点URL,会根据当前Server的zone和shouldPreferSameZoneEureka的配置获取一个经过排序的replicaUrls集合,再判断replicaUrls有没有自己,有就remove
      • updatePeerEurekaNodes(): 将内存中的老的PeerEurekaNodes.peerEurekaNodeUrls 与 上一步获得的新的newPeerUrls对比,删除不可用的,新增新添加的。之所以不直接用newPeerUrls,是因为在删除不可用时可以做PeerEurekaNode#shutdown(),在添加新的可以PeerEurekaNodes#createPeerEurekaNode()
  • EurekaServerContextEureka Server启动分析重点

    • Eureka Server上下文接口,包含initialize()、shutdown()方法,EurekaServerConfig配置,PeerEurekaNodes节点管理帮助类,PeerAwareInstanceRegistry对等体可见的应用实例注册器,ApplicationInfoManager当前应用实例info信息管理器(是由Client配置初始化的)

    • 默认实现类 com.netflix.eureka.DefaultEurekaServerContext

    • @PostConstruct方法包含一些初始化逻辑(说明初始化方法是在DefaultEurekaServerContext构造后由@PostConstruct触发的?)

      @PostConstruct
      @Override
      public void initialize() throws Exception {
          logger.info("Initializing ...");
      
          // PeerEurekaNode的帮助类start
          // 会启动更新PeerNode列表的定时线程
          peerEurekaNodes.start();
      
          // PeerAwareInstanceRegistry初始化
          // 启动numberOfReplicationsLastMin定时线程、initializedResponseCache()、scheduleRenewalThresholdUpdateTask()、initRemoteRegionRegistry(),还有添加JMX监控
          registry.init(peerEurekaNodes);
      
          logger.info("Initialized");
      }
  • EurekaServerBootstrap: Eureka Server启动引导,会在Spring容器基本refresh()完毕时由EurekaServerInitializerConfiguration#run()方法真正调用eurekaServerBootstrap.contextInitialized()初始化,其中会initEurekaEnvironment()initEurekaServerContext() Eureka Server启动分析重点

  • 注册 Jersey filter: 所有/eureka的请求都需要经过Jersery Filter,其处理类是com.sun.jersey.spi.container.servlet.ServletContainer,其既是Filter,也是Servlet,包含Jersey的处理逻辑。在构造时已经将com.netflix.discovery包com.netflix.eureka包 下的类作为处理请求的资源导入,如处理单个应用请求的com.netflix.eureka.resources.ApplicationResource


【重点1】Eureka Server上下文初始化

首先看Netflix的EurekaServerContext接口是如何定义的:

public interface EurekaServerContext {

    void initialize() throws Exception;

    void shutdown() throws Exception;

    EurekaServerConfig getServerConfig();

    PeerEurekaNodes getPeerEurekaNodes();

    ServerCodecs getServerCodecs();

    PeerAwareInstanceRegistry getRegistry();

    ApplicationInfoManager getApplicationInfoManager();

}

除了初始化initialize()方法,shutdown()方法,还有一些组件EurekaServerConfig、PeerEurekaNodes、ServerCodecs、PeerAwareInstanceRegistry、ApplicationInfoManager,而在自动配置构造DefaultEurekaServerContext时,这些组件都已设置好

@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
                           ServerCodecs serverCodecs,
                           PeerAwareInstanceRegistry registry,
                           PeerEurekaNodes peerEurekaNodes,
                           ApplicationInfoManager applicationInfoManager) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    this.registry = registry;
    this.peerEurekaNodes = peerEurekaNodes;
    this.applicationInfoManager = applicationInfoManager;
}

接下来是由@PostConstruct触发的初始化方法

@PostConstruct
@Override
public void initialize() throws Exception {
    logger.info("Initializing ...");
    peerEurekaNodes.start();
    registry.init(peerEurekaNodes);
    logger.info("Initialized");
}

主要调用了2个组件的初始化方法:PeerEurekaNodesPeerAwareInstanceRegistry


1、PeerEurekaNodes#start(): 初始化对等节点信息

public void start() {
    // 后台运行的单线程定时任务执行器,定时线程名:Eureka-PeerNodesUpdater
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );

    try {
        // 解析Eureka Server URL,并更新PeerEurekaNodes列表
        updatePeerEurekaNodes(resolvePeerUrls());

        // 启动定时执行任务peersUpdateTask(定时默认10min,由peerEurekaNodesUpdateIntervalMs配置)
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    // 定时任务中仍然是 解析Eureka Server URL,并更新PeerEurekaNodes列表
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }

    // 打印对等体节点(应该没有当前节点自己)
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  " + node.getServiceUrl());
    }
}

PeerEurekaNodes启动主要做了2件事:

  • 根据配置信息更新PeerEurekaNodes列表
  • 启动定时更新PeerEurekaNodes列表的任务peersUpdateTask,定时线程名【Eureka-PeerNodesUpdater】


resolvePeerUrls(): 解析配置的对等体URL

protected List<String> resolvePeerUrls() {
    // 当前Eureka Server自己的InstanceInfo信息
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    // 当前Eureka Server所在的zone,默认是 defaultZone
    String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
    // 获取配置的service-url
    List<String> replicaUrls = EndpointUtils
            .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

    // 遍历service-url,排除自己
    int idx = 0;
    while (idx < replicaUrls.size()) {
        if (isThisMyUrl(replicaUrls.get(idx))) {
            replicaUrls.remove(idx);
        } else {
            idx++;
        }
    }
    return replicaUrls;
}

isThisMyUrl() 是如何判断是自己的URL,进而排除的呢?

public boolean isThisMyUrl(String url) {
    return isInstanceURL(url, applicationInfoManager.getInfo());
}


public boolean isInstanceURL(String url, InstanceInfo instance) {
    // 根据配置项的url获取host主机信息
    String hostName = hostFromUrl(url);

    // 根据当前Eureka Server的Instance实例信息获取host主机信息
    String myInfoComparator = instance.getHostName();

    // 如果eureka.client.transport.applicationsResolverUseIp==true,即按照IP解析URL
    // 那么将当前Eureka Server的Instance实例信息转换为IP
    if (clientConfig.getTransportConfig().applicationsResolverUseIp()) {
        myInfoComparator = instance.getIPAddr();
    }

    // 比较配置项的hostName 和 当前Eureka Server的Instance实例信息
    return hostName != null && hostName.equals(myInfoComparator);
}

其中配置项中的hostName基本上就是 http:// 和 端口号 之间的部分,而当前Eureka Server实例的用于比较的myInfoComparator信息是

  • 如果主动配置了eureka.instance.hostname=xxx,配置值就是当前Eureka Server实例的host
  • 没有主动配置的话,会从在EurekaClientAutoConfiguration中创建EurekaInstanceConfigBean时使用的InetUtils中获取,InetUtils是spring cloud网络相关的工具类,其首先根据第一个非回环网卡获取IP(注意:docker容器环境有坑),再根据InetAddress获取与IP对应的hostname,我已知的是从如Linux的 /etc/hosts配置文件中获取 或者 从hostname环境变量获取
  • 如果eureka.client.transport.applicationsResolverUseIp=true,那么按照当前Eureka Server实例的IP来比较


updatePeerEurekaNodes(): 更新PeerEurekaNodes列表

// PeerEurekaNodes#updatePeerEurekaNodes()
// newPeerUrls为本次要更新的Eureka对等体URL列表
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
    if (newPeerUrls.isEmpty()) {
        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
        return;
    }

    // 计算 原peerEurekaNodeUrls - 新newPeerUrls 的差集,就是多余可shutdown节点
    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    toShutdown.removeAll(newPeerUrls);

    // 计算 新newPeerUrls - 原peerEurekaNodeUrls 的差集,就是需要新增节点
    Set<String> toAdd = new HashSet<>(newPeerUrls);
    toAdd.removeAll(peerEurekaNodeUrls);

    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 没有变更
        return;
    }

    // Remove peers no long available
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

    // shutDown多余节点
    if (!toShutdown.isEmpty()) {
        logger.info("Removing no longer available peer nodes {}", toShutdown);
        int i = 0;
        while (i < newNodeList.size()) {
            PeerEurekaNode eurekaNode = newNodeList.get(i);
            if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                newNodeList.remove(i);
                eurekaNode.shutDown();
            } else {
                i++;
            }
        }
    }

    // Add new peers
    // 添加新的peerEurekaNode - createPeerEurekaNode()
    if (!toAdd.isEmpty()) {
        logger.info("Adding new peer nodes {}", toAdd);
        for (String peerUrl : toAdd) {
            newNodeList.add(createPeerEurekaNode(peerUrl));
        }
    }

    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}


2、PeerAwareInstanceRegistry#init(peerEurekaNodes):集群实例注册器初始化

根据上一步初始化好的peerEurekaNodes,来初始化PeerAwareInstanceRegistry,考虑集群中的对等体的实例注册器

// PeerAwareInstanceRegistryImpl#init()
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    // 【重要】启动用于统计最后xx毫秒续约情况的定时线程
    this.numberOfReplicationsLastMin.start();

    this.peerEurekaNodes = peerEurekaNodes;

    // 【重要】初始化ResponseCache: 对客户端查询服务列表信息的缓存(所有服务列表、增量修改、单个应用)
    // 默认responseCacheUpdateIntervalMs=30s
    initializedResponseCache();

    // 【重要】定期更新续约阀值的任务,默认900s执行一次
    //  调用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
    scheduleRenewalThresholdUpdateTask();

    // 初始化 远程区域注册 相关信息(默认没有远程Region,都是使用us-east-1)
    initRemoteRegionRegistry();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}


numberOfReplicationsLastMin: 上一分钟来自对等节点复制的续约数统计

numberOfReplicationsLastMin是com.netflix.eureka.util.MeasuredRate用于统计测量上一分钟来自对等节点复制的续约数

// MeasuredRate#start()
public synchronized void start() {
    if (!isActive) {
        timer.schedule(new TimerTask() {

            @Override
            public void run() {
                try {
                    // Zero out the current bucket.
                    // 将当前的桶的统计数据放到lastBucket,当前桶置为0
                    lastBucket.set(currentBucket.getAndSet(0));
                } catch (Throwable e) {
                    logger.error("Cannot reset the Measured Rate", e);
                }
            }
        }, sampleInterval, sampleInterval);

        isActive = true;
    }
}

/**
 * Returns the count in the last sample interval.
 * 返回上一分钟的统计数
 */
public long getCount() {
    return lastBucket.get();
}

/**
 * Increments the count in the current sample interval.
 * 增加当前桶的计数,在以下2个场景有调用:
 * AbstractInstanceRegistry#renew() - 续约
 * PeerAwareInstanceRegistryImpl#replicateToPeers() -
 */
public void increment() {
    currentBucket.incrementAndGet();
}


初始化ResponseCache

ResponseCache主要是缓存服务列表信息,根据注释可知,缓存以压缩和非压缩形式维护,用于三类请求: all applications,增量更改和单个application

// ResponseCacheImpl构造
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value>  readWriteCacheMap;

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    // 根据配置eureka.server.useReadOnlyResponseCache判断,是否使用只读ResponseCache,默认true
    // 由于ResponseCache维护这一个可读可写的readWriteCacheMap,还有一个只读的readOnlyCacheMap
    // 此配置控制在get()应用数据时,是去只读Map读,还是读写Map读,应该只读Map是定期更新的
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;

    // eureka.server.responseCacheUpdateIntervalMs缓存更新频率,默认30s
    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();

    // 创建读写Map,com.google.common.cache.LoadingCache
    // 可以设置初始值,数据写入过期时间,删除监听器等
    this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(1000)
                    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            Key removedKey = notification.getKey();
                            if (removedKey.hasRegions()) {
                                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                            }
                        }
                    })
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            if (key.hasRegions()) {
                                Key cloneWithNoRegions = key.cloneWithoutRegions();
                                regionSpecificKeys.put(cloneWithNoRegions, key);
                            }
                            Value value = generatePayload(key);
                            return value;
                        }
                    });

    // 如果启用只读缓存,那么每隔responseCacheUpdateIntervalMs=30s,执行getCacheUpdateTask()
    if (shouldUseReadOnlyResponseCache) {
        timer.schedule(getCacheUpdateTask(),
                new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                        + responseCacheUpdateIntervalMs),
                responseCacheUpdateIntervalMs);
    }

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
    }
}

可见ResponseCache维护了两个Map,一个可读可写的readWriteCacheMap,应该每个操作都会写入,一个只读的readOnlyCacheMap,默认应该每30s更新一次,下面具体看看getCacheUpdateTask()

// ResponseCacheImpl#getCacheUpdateTask()
private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");

            // 遍历只读Map
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    // 如果只读Map中的值 和 读写Map中的值不同,用读写Map更新只读Map
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache", th);
                }
            }
        }
    };
}

每30s会比较只读Map和读写Map中的值,以读写Map中的为准


scheduleRenewalThresholdUpdateTask:定期更新续约阀值的任务

/**
 * Schedule the task that updates <em>renewal threshold</em> periodically.
 * The renewal threshold would be used to determine if the renewals drop
 * dramatically because of network partition and to protect expiring too
 * many instances at a time.
 * 每隔 eureka.server.renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值
 */
private void scheduleRenewalThresholdUpdateTask() {
    timer.schedule(new TimerTask() {
                       @Override
                       public void run() {
                           updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());
}

更新续约阀值在updateRenewalThreshold()方法

// PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
/**
 * Updates the <em>renewal threshold</em> based on the current number of
 * renewals. The threshold is a percentage as specified in
 * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
 * received per minute {@link #getNumOfRenewsInLastMin()}.
 */
private void updateRenewalThreshold() {
    try {
        Applications apps = eurekaClient.getApplications();
        int count = 0;
        // 统计所有Instance实例个数
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                if (this.isRegisterable(instance)) {
                    ++count;
                }
            }
        }

        synchronized (lock) {
            // Update threshold only if the threshold is greater than the
            // current expected threshold of if the self preservation is disabled.
            // 只有当阀值大于当前预期值时,才更新  或者  关闭了自我保护模式
            if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
                    || (!this.isSelfPreservationModeEnabled())) {
                this.expectedNumberOfRenewsPerMin = count * 2;
                this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
            }
        }
        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwable e) {
        logger.error("Cannot update renewal threshold", e);
    }
}

其实大体意思是:先计算所有Instance实例个数,默认每个实例1分钟应该续约2次(30s一次)

  • 如果开启自我保护模式,更新 expectedNumberOfRenewsPerMin预期每分钟续约数numberOfRenewsPerMinThreshold每分钟续约阀值
  • 如果没有开启自我保护模式,只有当本期续约数大于之前的阀值,即当前不处在自我保护模式中(自我保护模式中,不能删除服务列表,阀值自然也不能更新),才可以更新 expectedNumberOfRenewsPerMin 和 numberOfRenewsPerMinThreshold

但如上代码是有问题的,无论是注释还是判断逻辑,当前版本:eureka-core-1.6.2

直到 v1.9.3版本才修复

【SpringCloud Eureka源码】从Eureka Client发起注册请求到Eureka Server处理的整个服务注册过程(下)-LMLPHP

https://github.com/Netflix/eureka/commit/a4dd6b22ad447c706234e63fe83cb58413f7618b#diff-4aec7ea96457f5084840fc40f501c320

之后又有两个版本,修改了这里的计算逻辑和做了方法抽取

Add possibility to configure expected interval between clients' renews and not break self-preservation

Extract calculation of renews threshold to separate method


【重点2】EurekaServerBootstrap初始化

上面的自动配置过程中已经注册了处理所有 /eureka/** 请求的Jersey Filter,这样所有Client的注册、续约等请求都可以处理了。而还有一些工作是通过EurekaServerBootstrap#contextInitialized()完成的,在Spring容器基本上refresh()完毕的时候

// EurekaServerBootstrap#contextInitialized()
public void contextInitialized(ServletContext context) {
    try {
        initEurekaEnvironment();    // 初始化环境
        initEurekaServerContext();  // 初始化上下文

        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {
        log.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}

这两个里面我们主要关注上下文的初始化initEurekaServerContext()

// EurekaServerBootstrap#initEurekaServerContext()
protected void initEurekaServerContext() throws Exception {
    // For backward compatibility
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);

    // 是否为AWS环境
    if (isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }

    // 将serverContext由Holder保管
    EurekaServerContextHolder.initialize(this.serverContext);

    log.info("Initialized server context");

    // Copy registry from neighboring eureka node
    // 从相邻的eureka节点拷贝注册列表信息
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);

    // Register all monitoring statistics.
    EurekaMonitors.registerAllStats();
}

有两个重要环接:

  • registry.syncUp(): 从相邻eureka节点拷贝注册列表信息
  • registry.openForTraffic(): 允许开始与客户端的数据传输,即开始作为Server服务


1、registry.syncUp():从相邻eureka节点拷贝注册列表信息

/**
 * Populates the registry information from a peer eureka node. This
 * operation fails over to other nodes until the list is exhausted if the
 * communication fails.
 */
@Override
public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;

    // 循环,最多重试RegistrySyncRetries次(默认 5)
    // eurekaClient中的逻辑会重试其它的eureka节点
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); //30s
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }

        // 从eurekaClient获取服务列表
        Applications apps = eurekaClient.getApplications();
        // 循环服务列表,并依次注册
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}


2、registry.openForTraffic(): 允许与客户端的数据传输

// InstanceRegistry#openForTraffic()
/**
 * If
 * {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)}
 * is called with a zero argument, it means that leases are not automatically
 * cancelled if the instance hasn't sent any renewals recently. This happens for a
 * standalone server. It seems like a bad default, so we set it to the smallest
 * non-zero value we can, so that any instances that subsequently register can bump up
 * the threshold.
 */
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 如果count==0,即没有从相邻eureka节点得到服务列表,如单机启动模式,defaultOpenForTrafficCount=1
    super.openForTraffic(applicationInfoManager,
            count == 0 ? this.defaultOpenForTrafficCount : count);
}


// PeerAwareInstanceRegistryImpl#openForTraffic()
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 每分钟期待的续约数(默认30s续约,60s就是2次)
    this.expectedNumberOfRenewsPerMin = count * 2;

    // 每分钟续约的阀值:85% * expectedNumberOfRenewsPerMin
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    logger.info("Got " + count + " instances from neighboring DS node");
    logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);

    this.startupTime = System.currentTimeMillis();
    if (count > 0) { //可count默认值是1,那么peerInstancesTransferEmptyOnStartup始终不会是true
                     //在PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)方法有用
        this.peerInstancesTransferEmptyOnStartup = false;
    }

    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }

    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);

    // 开启新的【EvictionTask】
    super.postInit();
}

// AbstractInstanceRegistry#postInit()
protected void postInit() {
    renewsLastMin.start(); //统计上一分钟续约数的监控Timer

    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),  //默认60s
            serverConfig.getEvictionIntervalTimerInMs());
}
  • 如果没有从相邻eureka节点获得服务,count默认为1
  • 初始化每分钟期待的续约数 expectedNumberOfRenewsPerMin = count * 2
  • 初始化每分钟续约阀值numberOfRenewsPerMinThreshold= 85% * expectedNumberOfRenewsPerMin
  • applicationInfoManager设置状态为UP
  • 开启新的【EvictionTask】驱逐任务


二、Eureka Server处理注册请求

经过上面的Eureka Server自动配置及初始化,Eureka Server已经成功启动并可以通过Jersey处理各种请求,具体的注册请求是由com.netflix.eureka.resources.ApplicationResource#addInstance()处理的

ApplicationResource#addInstance() - 注册单个应用实例

// ApplicationResource#addInstance()
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);

    // validate that the instanceinfo contains all the necessary required fields
    // 验证Instance实例的所有必填字段
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }

    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    // 处理客户端可能正在使用缺少数据的错误DataCenterInfo注册的情况
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }

    // 【 使用PeerAwareInstanceRegistry集群实例注册器register当前实例 】
    // isReplication表示此操作是否是节点间的复制,此处isReplication==null
    registry.register(info, "true".equals(isReplication));

    return Response.status(204).build();  // 204 to be backwards compatible
                                          // 注册成功返回204状态码
}

重点是 registry.register(info, "true".equals(isReplication)),即使用PeerAwareInstanceRegistry集群实例注册器register当前实例


PeerAwareInstanceRegistryImpl#register() - 注册服务信息并同步到其它Eureka节点

// PeerAwareInstanceRegistryImpl#register()
/**
 * Registers the information about the {@link InstanceInfo} and replicates
 * this information to all peer eureka nodes. If this is replication event
 * from other replica nodes then it is not replicated.
 * 注册有关InstanceInfo信息,并将此信息复制到所有对等的eureka节点
 * 如果这是来自其他节点的复制事件,则不会继续复制它
 *
 * @param info
 *            the {@link InstanceInfo} to be registered and replicated.
 * @param isReplication
 *            true if this is a replication event from other replica nodes,
 *            false otherwise.
 */
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; //默认的租约持续时间是90s

    // 如果当前Instance实例的租约信息中有leaseDuration持续时间,使用实例的leaseDuration
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }

    // 【 当前Eureka Server注册实例信息 】
    super.register(info, leaseDuration, isReplication);

    // 【 将注册实例信息复制到集群中其它节点 】
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
  • 默认leaseDuration租约持续时间为90s,如果当前Instance实例的租约信息中有leaseDuration持续时间,使用实例的leaseDuration
  • 【重点】当前Eureka Server注册实例信息
  • 【重点】将注册实例信息复制到集群中其它节点


AbstractInstanceRegistry#register():注册

/**
 * Registers a new instance with a given duration.
 *
 * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
 */
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock(); //读锁

        // registry是保存所有应用实例信息的Map:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
        // 从registry中获取当前appName的所有实例信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());

        REGISTER.increment(isReplication); //注册统计+1

        // 如果当前appName实例信息为空,新建Map
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }

        // 获取实例的Lease租约信息
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        // 如果已经有租约,则保留最后一个脏时间戳而不覆盖它
        // (比较当前请求实例租约 和 已有租约 的LastDirtyTimestamp,选择靠后的)
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        }
        else {
            // The lease does not exist and hence it is a new registration
            // 如果之前不存在实例的租约,说明是新实例注册
            // expectedNumberOfRenewsPerMin期待的每分钟续约数+2(因为30s一个)
            // 并更新numberOfRenewsPerMinThreshold每分钟续约阀值(85%)
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }

        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease); //当前实例信息放到维护注册信息的Map

        // 同步维护最近注册队列
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }

        // This is where the initial state transfer of overridden status happens
        // 如果当前实例已经维护了OverriddenStatus,将其也放到此Eureka Server的overriddenInstanceStatusMap中
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        // 根据overridden status规则,设置状态
        InstanceStatus overriddenInstanceStatus
            = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        // 如果租约以UP状态注册,设置租赁服务时间戳
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }

        registrant.setActionType(ActionType.ADDED); //ActionType为 ADD
        recentlyChangedQueue.add(new RecentlyChangedItem(lease)); //维护recentlyChangedQueue
        registrant.setLastUpdatedTimestamp(); //更新最后更新时间

        // 使当前应用的ResponseCache失效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock(); //读锁
    }
}
  • 维护当前Instance实例的Lease租约信息,并放到Eureka Server维护注册信息的Map:【ConcurrentHashMap<String, Map<String, Lease>>】,对应关系是 appName:<Instance实例Id,Lease租约信息>

  • 如果是新注册,expectedNumberOfRenewsPerMin期待的每分钟续约数+2, 并更新numberOfRenewsPerMinThreshold每分钟续约阀值

  • 维护 recentRegisteredQueue最近注册队列,recentlyChangedQueue最近更改队列,维护的目的是可以获取最近xx操作的情况

  • 如果本次注册实例已经维护了OverriddenStatus,根据一定规则,维护本Server节点当前实例的OverriddenStatus

  • 设置Instance实例的最后更新时间戳

  • 对当前应用对应的ResponseCache缓存失效


PeerAwareInstanceRegistryImpl#replicateToPeers() :复制到Eureka对等节点

// PeerAwareInstanceRegistryImpl#replicateToPeers()
/**
 * Replicates all eureka actions to peer eureka nodes except for replication
 * traffic to this node.
 */
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // 如果是复制操作(针对当前节点,false)
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }

        // If it is a replication already, do not replicate again as this will create a poison replication
        // 如果它已经是复制,请不要再次复制,直接return
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        // 遍历集群所有节点(除当前节点外)
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }

            // 复制Instance实例操作到某个node节点
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    }
    finally {
        tracer.stop();
    }
}

下面是replicateInstanceActionsToPeers()复制Instance实例操作到其它节点

// PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers()
/**
 * Replicates all instance changes to peer eureka nodes except for
 * replication traffic to this node.
 *
 */
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:  //取消
                node.cancel(appName, id);
                break;
            case Heartbeat:  //心跳
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:  //注册
                node.register(info);
                break;
            case StatusUpdate:  //状态更新
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:  //删除OverrideStatus
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

本次只关心节点的注册操作

// PeerEurekaNode#register()
/**
 * Sends the registration information of {@link InstanceInfo} receiving by
 * this node to the peer node represented by this class.
 *
 * @param info
 *            the instance information {@link InstanceInfo} of any instance
 *            that is send to this instance.
 * @throws Exception
 */
public void register(final InstanceInfo info) throws Exception {
    // 当前时间 + 30s后 过期
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);

    // 提交相同的操作到批量复制任务处理
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, overriddenStatus:null, replicateInstanceInfo:true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

而之后就和Eureka Client发起注册请求的调用差不多 replicationClient.register(info)

至此,Spring Cloud Eureka Server的整个自动配置及初始化,以及接收注册请求,并复制到集群中的对等节点就分析完了

大体时序流程参考:

【SpringCloud Eureka源码】从Eureka Client发起注册请求到Eureka Server处理的整个服务注册过程(下)-LMLPHP


参考:

Dive into Eureka: 宋顺

01-24 12:46