Loading...
墨滴

lyq

2021/11/12  阅读:17  主题:默认主题

Spring Cloud Netflex Eureka Server源码分析(一)

Spring Cloud Netflex Eureka Server源码分析(一)


前言

上一篇文章sping cloud netflex eureka client篇通过源码知道 ,eureka Client 是通过 http rest来 与 eureka server 交互,实现 注册服务,续约服务,服务下线 等。本篇探究下eureka server。

基本用法

  1. 引入依赖
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
  1. 配置application.yml
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8671/eureka
    # 单机环境下,以下两个配置项设置为false,集群环境下开启
    register-with-eureka: false
    fetch-registry: false
  1. 激活配置
@SpringBootApplication
//激活eureka server配置
@EnableEurekaServer
public class EurekaServerApplication {

 public static void main(String[] args) {
  SpringApplication.run(EurekaServerApplication.classargs);
 }

}

源码分析

配置阶段

@EnableEurekaServer

@EnableEurekaServer 注解为入口分析,通过源码可以看出他是一个标记注解:

/**
 * Annotation to activate Eureka Server related configuration.
 * {@link EurekaServerAutoConfiguration}
 *
 * @author Dave Syer
 * @author Biju Kunjummen
 *
 */


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer 
{

}

从注释可以知道,用来激活 eureka server 的 配置类 EurekaServerAutoConfiguration 中相关配置。同时通过@Import注解导入了EurekaServerMarkerConfiguration.class,先进入这个类:

/**
 * Responsible for adding in a marker bean to activate
 * {@link EurekaServerAutoConfiguration}.
 *
 * @author Biju Kunjummen
 */

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {

 @Bean
 public Marker eurekaServerMarkerBean() {
  return new Marker();
 }

 class Marker {

 }

}

通过这个类的注释也可以知道,负责添加激活EurekaServerAutoConfiguration的标记bean,那么进入EurekaServerAutoConfiguration中:

EurekaServerAutoConfiguration
/**
 * @author Gunnar Hillert
 * @author Biju Kunjummen
 * @author Fahim Farook
 */

@Configuration(proxyBeanMethods = false)
//导入EurekaServerInitializerConfiguration配置类
@Import(EurekaServerInitializerConfiguration.class)
// 条件是容器中存在EurekaServerMarkerConfiguration.Marker.class类型的bean
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties(
{ EurekaDashboardProperties.classInstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer 
{

 /**
  * List of packages containing Jersey resources required by the Eureka server.
  */

 private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery""com.netflix.eureka" };

 @Autowired
 private ApplicationInfoManager applicationInfoManager;

 @Autowired
 private EurekaServerConfig eurekaServerConfig;

 @Autowired
 private EurekaClientConfig eurekaClientConfig;

 @Autowired
 private EurekaClient eurekaClient;

 @Autowired
 private InstanceRegistryProperties instanceRegistryProperties;

 /**
  * A {@link CloudJacksonJson} instance.
  */

 public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();

 @Bean
 public HasFeatures eurekaServerFeature() {
  return HasFeatures.namedFeature("Eureka Server", EurekaServerAutoConfiguration.class);
 }

 //加载EurekaController,SpringCloud 提供了一些额外的接口,用来获取eurekaServer的信息
 @Bean
 @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
 public EurekaController eurekaController() {
  return new EurekaController(this.applicationInfoManager);
 }

 //省略 ...

 //接收客户端的注册等请求就是通过InstanceRegistry来处理的,是真正处理业务的类,接下来会详细分析
 @Bean
 public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
  this.eurekaClient.getApplications(); // force initialization
  return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient,
    this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
    this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
 }

 //配置服务节点信息,这里的作用主要是为了配置Eureka的peer节点,也就是说当收到有节点注册上来的时候,需要通知给哪些服务节点。(互为一个集群)
 @Bean
 @ConditionalOnMissingBean
 public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs,
   ReplicationClientAdditionalFilters replicationClientAdditionalFilters)
 
{
  return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs,
    this.applicationInfoManager, replicationClientAdditionalFilters);
 }
 
 //EurekaServer上下文
 @Bean
 @ConditionalOnMissingBean
 public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry,
   PeerEurekaNodes peerEurekaNodes)
 
{
  return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes,
    this.applicationInfoManager);
 }

 // 这个类的作用是spring cloud和原生eureka的胶水代码,通过这个类来启动EurekaSever 
 // 后面这个类会在EurekaServerInitializerConfiguration被调用,进行eureka启动并且会同步其他注册中心的数据到当前注册中心
 @Bean
 public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
   EurekaServerContext serverContext)
 
{
  return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
    registry, serverContext);
 }

 /**
  * Register the Jersey filter.
  * @param eurekaJerseyApp an {@link Application} for the filter to be registered
  * @return a jersey {@link FilterRegistrationBean}
  */

  // 配置拦截器,ServletContainer里面实现了jersey框架,通过他来实现eurekaServer对外的restFull接口
 @Bean
 public FilterRegistrationBean<?> jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {
  FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
  bean.setFilter(new ServletContainer(eurekaJerseyApp));
  bean.setOrder(Ordered.LOWEST_PRECEDENCE);
  bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

  return bean;
 }

 //省略 ... 

 @Configuration(proxyBeanMethods = false)
 protected static class EurekaServerConfigBeanConfiguration {
  
  // 创建并加载EurekaServerConfig的实现类,主要是Eureka-server的配置信息
  @Bean
  @ConditionalOnMissingBean
  public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
   EurekaServerConfigBean server = new EurekaServerConfigBean();
   if (clientConfig.shouldRegisterWithEureka()) {
    // Set a sensible default if we are supposed to replicate
    server.setRegistrySyncRetries(5);
   }
   return server;
  }

 }

 /**
  * {@link PeerEurekaNodes} which updates peers when /refresh is invoked. Peers are
  * updated only if <code>eureka.client.use-dns-for-fetching-service-urls</code> is
  * <code>false</code> and one of following properties have changed.
  * <p>
  * </p>
  * <ul>
  * <li><code>eureka.client.availability-zones</code></li>
  * <li><code>eureka.client.region</code></li>
  * <li><code>eureka.client.service-url.&lt;zone&gt;</code></li>
  * </ul>
  */

 static class RefreshablePeerEurekaNodes extends PeerEurekaNodes
   implements ApplicationListener<EnvironmentChangeEvent
{

  private ReplicationClientAdditionalFilters replicationClientAdditionalFilters;

  RefreshablePeerEurekaNodes(final PeerAwareInstanceRegistry registry, final EurekaServerConfig serverConfig,
    final EurekaClientConfig clientConfig, final ServerCodecs serverCodecs,
    final ApplicationInfoManager applicationInfoManager,
    final ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
   super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);
   this.replicationClientAdditionalFilters = replicationClientAdditionalFilters;
  }

  @Override
  protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
   JerseyReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig,
     serverCodecs, peerEurekaNodeUrl);

   this.replicationClientAdditionalFilters.getFilters().forEach(replicationClient::addReplicationClientFilter);

   String targetHost = hostFromUrl(peerEurekaNodeUrl);
   if (targetHost == null) {
    targetHost = "host";
   }
   return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
  }

  @Override
  public void onApplicationEvent(final EnvironmentChangeEvent event) {
   if (shouldUpdate(event.getKeys())) {
    updatePeerEurekaNodes(resolvePeerUrls());
   }
  }

  /*
   * Check whether specific properties have changed.
   */

  protected boolean shouldUpdate(final Set<String> changedKeys) {
   assert changedKeys != null;

   // if eureka.client.use-dns-for-fetching-service-urls is true, then
   // service-url will not be fetched from environment.
   if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) {
    return false;
   }

   if (changedKeys.contains("eureka.client.region")) {
    return true;
   }

   for (final String key : changedKeys) {
    // property keys are not expected to be null.
    if (key.startsWith("eureka.client.service-url.")
      || key.startsWith("eureka.client.availability-zones.")) {
     return true;
    }
   }
   return false;
  }

 }

 //省略 ... 

}

EurekaServerAutoConfiguration类上的注解@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)可以看到,当容器中有EurekaServerMarkerConfiguration.Marker.class,就可以激活该配置类,并且通过@Import(EurekaServerInitializerConfiguration.class) 导入了EurekaServerInitializerConfiguration类,那么接下来详细看下该配置类为我们做了什么。

EurekaServerInitializerConfiguration
@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration implements ServletContextAwareSmartLifecycleOrdered {

 private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);

 @Autowired
 private EurekaServerConfig eurekaServerConfig;

 private ServletContext servletContext;

 @Autowired
 private ApplicationContext applicationContext;

 @Autowired
 private EurekaServerBootstrap eurekaServerBootstrap;

 private boolean running;

 private int order = 1;

 @Override
 public void setServletContext(ServletContext servletContext) {
  this.servletContext = servletContext;
 }

 @Override
 public void start() {
  new Thread(() -> {
   try {
    // TODO: is this class even needed now?
    // 初始化EurekaServer,同时注册Eureka Server
    eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
    log.info("Started Eureka Server");
    
    //发布EurekaServer注册事件
    publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
    // 设置启动的状态为true
    EurekaServerInitializerConfiguration.this.running = true;
    
    // 发送Eureka Start 事件 , 其他还有各种事件,我们可以监听这种时间,然后做一些特定的业务需求
    publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
   }
   catch (Exception ex) {
    // Help!
    log.error("Could not initialize Eureka servlet context", ex);
   }
  }).start();
 }

 private EurekaServerConfig getEurekaServerConfig() {
  return this.eurekaServerConfig;
 }

 private void publish(ApplicationEvent event) {
  this.applicationContext.publishEvent(event);
 }

 @Override
 public void stop() {
  this.running = false;
  eurekaServerBootstrap.contextDestroyed(this.servletContext);
 }

 @Override
 public boolean isRunning() {
  return this.running;
 }

 @Override
 public int getPhase() {
  return 0;
 }

 @Override
 public boolean isAutoStartup() {
  return true;
 }

 @Override
 public int getOrder() {
  return this.order;
 }

}

EurekaServerInitializerConfiguration配置类实现了SmartLifecycle,我们知道实现了SmartLifecycle接口的,会在Ioc容器中所有Bean初始化完成后,根据isAutoStartup()方法返回true来执行该配置类的start()

启动阶段

从这里开始,进入spring boot项目的启动阶段。这个start方法中开启了一个新的线程,然后进行一些Eureka Server的初始化工作,比如调用eurekaServerBootstrap的contextInitialized方法,进入EurekaServerBootstrap.contextInitialized(ServletContext context)方法:

EurekaServerBootstrap
public void contextInitialized(ServletContext context) {
 try {
  // 初始化Eureka的环境变量,在当前的版本中,什么也没做
  initEurekaEnvironment();
  // 初始化Eureka的上下文
  initEurekaServerContext();

  context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
 }
 catch (Throwable e) {
  log.error("Cannot bootstrap eureka server :", e);
  throw new RuntimeException("Cannot bootstrap eureka server :", e);
 }
}

进入initEurekaServerContext()方法:

protected void initEurekaServerContext() throws Exception {
 // For backward compatibility
 JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
 XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

 if (isAws(this.applicationInfoManager.getInfo())) {
  this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry,
    this.applicationInfoManager);
  this.awsBinder.start();
 }

 //初始化eureka server上下文
 EurekaServerContextHolder.initialize(this.serverContext);

 log.info("Initialized server context");

 // Copy registry from neighboring eureka node
 // 1.从相邻的eureka集群节点复制注册表,返回服务实例注册总数
 int registryCount = this.registry.syncUp();
 // 2.默认每30秒发送心跳,1分钟就是2次
 // 修改eureka状态为up 
 // 调用父类的postInit 开启一个剔除定时任务,每隔60执行一次,从当前服务清单中把超时(默认90秒)没有续约的服务剔除
 this.registry.openForTraffic(this.applicationInfoManager, registryCount);

 // Register all monitoring statistics.
 EurekaMonitors.registerAllStats();
}
  1. 进入PeerAwareInstanceRegistryImpl.syncUp()方法:
@Override
public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;
 
 //如果eureka.client.register-with-eureka=false, 那么serverConfig.getRegistrySyncRetries()返回0,如果为true,默认返回5(重试次数)
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
             //默认休眠30秒
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        //通过DiscoveryClient中的成员localRegionApps中获取已注册的应用
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                 //非来自亚马逊的服务,都返回true
                    if (isRegisterable(instance)) {
                     //将其他节点的实例注册到本节点
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

该方法中的eurekaClient.getApplications()获取集群中的其他节点的所有服务实例。然后遍历获取到的apps,根据isRegisterable(instance)判断是否可注册,如果可以注册就调用register(instance, instance.getLeaseInfo().getDurationInSecs(), true)进行注册,注册实质就是往AbstractInstanceRegistry的属性private final ConcurrentHashMap<String, Map<String, Lease <InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();中加入服务实例信息。这里在服务端服务注册的时候会详细介绍。

  1. 进入PeerAwareInstanceRegistryImpl.openForTraffic(this.applicationInfoManager, registryCount)方法:
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 预期发送续约的客户端数,默认至少有一个
    this.expectedNumberOfClientsSendingRenews = count;
    // 更新每分钟最小续约阈值(自我保护阈值)
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    //设置实例状态为up
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    //开启一个剔除定时任务,每隔60执行一次,从当前服务清单中把超时(默认90秒)没有续约的服务剔除。
    super.postInit();
}

进入父类AbstractInstanceRegistrypostInit()方法:

protected void postInit() {
 //启动一个定时器默认延时1分钟执行,而后每隔一分钟,周期性执行,暂时不知道作用是什么
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    //设置剔除任务EvictionTask
    evictionTaskRef.set(new EvictionTask());
    //每隔60s执行一次EvictionTask的run方法
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

进入AbstractInstanceRegistry#EvictionTaskrun方法:

@Override
public void run() {
    try {
     //计算补偿时间
        long compensationTimeMs = getCompensationTimeMs();
        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
        //服务剔除
        evict(compensationTimeMs);
    } catch (Throwable e) {
        logger.error("Could not run the evict task", e);
    }
}

进入evict方法:

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");

 // 是否需要开启自我保护机制,如果需要,那么直接return, 不需要继续往下执行了(eureka服务自我保护机制的核心方法,下面会介绍,先留个印象)
    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    //遍历服务注册表,刷选出已经超时未续约的服务,加入到expiredLeases列表中
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    // 计算本地注册表服务实例总数(包含服务端和客户端实例)
    int registrySize = (int) getLocalRegistrySize();
    // 服务注册阈值 服务实例总数*0.85
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    // 最多可以剔除多少个服务
    int evictionLimit = registrySize - registrySizeThreshold;

 // 服务实例过期数与最多剔除数比较,取最小值
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        //遍历应剔除数
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            //从已过期服务实例列表中,随机剔除一个服务实例
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            //标识服务实例状态为DOWN,内部会调用服务下线的逻辑,这一块,在下一篇讲解。
            internalCancel(appName, id, false);
        }
    }
}

执行AbstractInstanceRegistry.evict(),剔除逻辑:主要的功能是将注册表registry,其实就是一个ConcurrentHashMap的所有注册实例遍历下,看哪些是过期的,过期了就加入到expiredLeases中,这里并不会剔除全部过期服务实例,而是至多清理15%的服务,然后遍历最大剔除数,随机从expiredLeases中选择一个,执行internalCancel方法把实例状态修改成DELETED状态,这样客户端就拿不到。

DefaultEurekaServerContext

EurekaServerAutoConfiguration类,我们可以看到有个初始化EurekaServerContext的方法:

@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry,
  PeerEurekaNodes peerEurekaNodes)
 
{
 return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes,
   this.applicationInfoManager);
}

DefaultEurekaServerContext 这个类里面的的initialize()方法是被@PostConstruct 这个注解修饰的, 在bean被初始化之前会被调用:

@PostConstruct
@Override
public void initialize() {
    logger.info("Initializing ...");
    peerEurekaNodes.start();
    try {
        registry.init(peerEurekaNodes);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    logger.info("Initialized");
}

peerEurekaNodes.start()主要是启动一个只拥有一个线程的线程池,第一次进去会更新一下集群其他节点信息 然后启动了一个定时线程,每10分钟更新一次,也就是说后续可以根据配置动态的修改节点配置。(原生的spring cloud config支持),进入PeerEurekaNodes的start()方法:

public void start() {
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
     // 首次进来,更新集群节点信息
        updatePeerEurekaNodes(resolvePeerUrls());
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        //周期性的执行更新集群节点信息任务,默认10分钟执行一次。
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  {}", node.getServiceUrl());
    }
}

Eureka的自我保护机制的原理

Eureka Server在运行期间会去统计心跳失败的比例在15分钟之内是否低于85% , 如果低于85%,Eureka Server会认为当前实例的客户端与自己的心跳连接出现了网络故障,那么Eureka Server会把这些实例保护起来,让这些实例不会过期导致实例剔除。这样做的目的是为了减少网络不稳定或者网络分区的情况下,Eureka Server将健康服务剔除下线的问题。 使用自我保护机制可以使得Eureka 集群更加健壮和稳定的运行。进入自我保护状态后,会出现以下几种情况:

  • Eureka Server不再从注册列表中移除因为长时间没有收到心跳而应该剔除的过期服务
  • Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其他节点上,保证当前节点依然可用。
自我保护机制的开启

eureka.server.enable-self-preservation=true 默认就是开启状态

在Eureka的自我保护机制中,有两个很重要的变量,Eureka的自我保护机制,都是围绕这两个变量来实现的,在AbstractInstanceRegistry这个类中定义的:

//每分钟最小续约数量
protected volatile int numberOfRenewsPerMinThreshold; 
//预期每分钟收到续约的 客户端数量,取决于注册到eureka server上的服务数量
protected volatile int expectedNumberOfClientsSendingRenews; 

进入AbstractInstanceRegistryupdateRenewsPerMinThreshold方法(这个方法是eureka服务端自我保护机制的核心):

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

numberOfRenewsPerMinThreshold 表示每分钟的最小续约数量,它表示意思就是EurekaServer期望每分钟收到客户端实例续约的总数的阈值。如果小于这个阈值,就会触发自我保护机制。它是在以下代码中赋值的; 自我保护阀值 = 服务总数 * 每分钟续约数(60S/客户端续约间隔) * 自我保护续约百分比阀值因子。其中:

  • expectedNumberOfClientsSendingRenews 表示预期客户端续约总数 动态值
  • getExpectedClientRenewalIntervalSeconds,客户端的续约间隔,默认为30s
  • getRenewalPercentThreshold,自我保护续约百分比阈值因子,默认0.85

举个例子: 假设有10个客户端,发送心跳间隔为30秒,那么一分钟,如果全部正常的话,服务端应该收到20次心跳,那么如果服务端一分钟内收到的心跳数小于20*0.85,即小于17个的时候,就会触发自我保护机制

需要注意的是,这两个变量是动态更新的,有四个地方来更新这两个值;下面说下这4种情况:

  1. 在Eureka初始化时EurekaServerBootstrap这个类中,有一个 initEurekaServerContext 方法,在这个方法中会调用PeerAwareInstanceRegistryImpl.openForTraffic方法,上面已经介绍过了,此处摘取部分代码:
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
  1. PeerAwareInstanceRegistryImpl.cancel方法:

当服务提供者主动下线时,表示这个时候Eureka要剔除这个服务提供者的地址,同时也代表这这个心跳续约的阈值要发生变化。所以在 PeerAwareInstanceRegistryImpl.cancel 中可以看到数据的更新;调用路径 PeerAwareInstanceRegistryImpl.cancel -> AbstractInstanceRegistry.cancel->internalCancel


//省略...

synchronized (lock) {
    if (this.expectedNumberOfClientsSendingRenews > 0) {
        // Since the client wants to cancel it, reduce the number of clients to send renews.
        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
        updateRenewsPerMinThreshold();
    }
}
  1. PeerAwareInstanceRegistryImpl.register方法:

当有新的服务提供者注册到eureka-server上时,需要增加续约的客户端数量,所以在register方法中会进行处理register ->(AbstractInstanceRegistry)super.register(...)

//省略...

// The lease does not exist and hence it is a new registration
synchronized (lock) {
    if (this.expectedNumberOfClientsSendingRenews > 0) {
        // Since the client wants to register it, increase the number of clients sending renews
        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
        updateRenewsPerMinThreshold();
    }
}
logger.debug("No previous lease information found; it is new registration");

//省略...
  1. PeerAwareInstanceRegistryImpl.scheduleRenewalThresholdUpdateTask方法:

15分钟运行一次,判断在15分钟之内心跳失败比例是否低于85%。在DefaultEurekaServerContext 》@PostConstruct修饰的initialize()方法》PeerAwareInstanceRegistryImpl.init()

@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    initializedResponseCache();
    scheduleRenewalThresholdUpdateTask();
    initRemoteRegionRegistry();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}
private void scheduleRenewalThresholdUpdateTask() {
    timer.schedule(new TimerTask() {
                       @Override
                       public void run() {
                           updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());
}
private void updateRenewalThreshold() {
    try {
        Applications apps = eurekaClient.getApplications();
        int count = 0;
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                if (this.isRegisterable(instance)) {
                    ++count;
                }
            }
        }
        synchronized (lock) {
            // Update threshold only if the threshold is greater than the
            // current expected threshold or if self preservation is disabled.
            if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                    || (!this.isSelfPreservationModeEnabled())) {
                this.expectedNumberOfClientsSendingRenews = count;
                updateRenewsPerMinThreshold();
            }
        }
        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwable e) {
        logger.error("Cannot update renewal threshold", e);
    }
}
自我保护机制触发任务

关于eureka服务端的自我保护机制的触发其实就是在PeerAwareInstanceRegistryImpl.isLeaseExpirationEnabled()方法中进行逻辑判断的(在上文中的AbstractInstanceRegistry.evict方法中已经介绍过):

@Override
public boolean isLeaseExpirationEnabled() {
 //是否开启了自我保护机制,默认是true
    if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        // 自我保护机制关闭,因此允许服务实例过期
        return true;
    }
    
    //计算是否需要开启自我保护,判断最后一分钟收到的心跳续约数量是否大于每分钟最小心跳续约数
    只有当最后一分钟的心跳数小于阈值时,才会触发自我保护机制
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

总结

  • 通过@EnableEurekaServer激活服务端配置

  • DefaultEurekaServerContext实例化的过程中

    1. 通过@PostConstruct注解,更新集群节点信息,然后启动一个定时任务,初始延迟10分钟执行一次,之后每隔10分钟周期性执行更新集群节点信息任务。
    2. 开启一个定时任务,初始延迟15分钟执行一次,之后每隔15分钟周期性的检查:每分钟最小续约数
  • 在容器启动后,通过EurekaServerInitializerConfigurationstart方法,做了两件事情:

    1. 从邻近的eureka集群节点同步服务注册列表到本地
    2. 开启服务剔除定时任务,初始延迟60秒,之后每隔60秒执行一次,将超过90秒没有续约的服务实例剔除,前提是关闭了自我保护机制。

了解源码实现背后的原理,后面我们再看Nacos服务注册的源码就会发现,其实背后的原理都是差不多的,关键在于处理的细节。

下一篇我们将介绍eureka服务端接收客户端的请求后,是怎么处理服务注册、服务续约以及服务下线的,敬请期待!!!

最后,如分析有误,敬请指正!!!

补充

最后补充一张从网上找的流程图(偷个懒)

更多原创文章,请扫码关注我的微信公众号
更多原创文章,请扫码关注我的微信公众号

lyq

2021/11/12  阅读:17  主题:默认主题

作者介绍

lyq