Eureka就是Netflix开源的一款提供服务注册和发现的产品,以RESTful服务的形式提供服务注册和服务发现服务,并且提供了相应的Java客户端。简化结构如下:

architecture

Eureka Server:Eureka服务端,提供注册服务,将所有注册的服务信息保存在内存中

Eureka Client:Eureka客户端,提供基础服务,根据功能可以分为

​ 提供者Provider:服务提供者,向服务端进行服务注册、续租和下线,对外提供服务调用

​ 消费者Consumer:服务消费者,向服务端发现服务信息,以便对提供者提供的服务进行消费

通常来说,一个Eureka Client既是提供者也是消费者,也就是说会同时对外提服务,也会对其他提供者的服务进行消费。

1.开发

1.1Eureka Server

Eureka Server以RESTful服务的形式对provider提供注册服务,对consumer提供服务发现。所有注册的应用实例均保存在内存中,为了保证注册中心的高可用性,通常需要建立集群。下面为单一注册中心的编写与配置。

1.1.1 单节点配置

pom

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>

java

@SpringCloudApplication
@EnableEurekaServer
public class Application{
    public static void main(String[] args){
        SpringApplication.run(Application.class, args);
    }
}

注:@SpringCloudApplication注解引用了@SpringBootApplication、@EnableDiscoveryClient和@EnableCircuitBreaker,即自动启用了服务发现以及断路器。

application.properties

server.port=8000
eureka.client.register-with-eureka=false
eureka.client.fetch-registry=false
# 注册地址,多个地址可以使用,分割
eureka.client.serviceUrl.defaultZone=http://localhost:${server.port}/eureka/

1.1.2 高可用性

通常来说,单一注册中心无法满足多业务系统高可用性的需求,所以需要对Server进行集群配置。以三台Server为例:

未命名文件

Eureka Server没有主从之分,所有集群的Server只需要将自己作为客户端注册到其他注册中心即可。

eureka.client.serviceUrl.defaultZone=http://localhost:8081/eureka/,http://localhost:8082/eureka/
1.2Eureka Client

pom

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-eureka-client</artifactId>
</dependency>

java

@SpringCloudApplication
@EnableEurekaClient
public class Application{
    public static void main(String[] args){
        SpringApplication.run(Application.class, args);
    }
}

注:@EnableEurekaClient和@EnableDiscoveryClient区别:@EnableDiscoveryClient注解是基于spring-cloud-commons依赖,并且在classpath中实现;@EnableEurekaClient注解是基于spring-cloud-netflix依赖,只能为eureka作用。

application.properties

server.port=8000
# 注册地址,多个地址可以使用,分割
eureka.client.serviceUrl.defaultZone=http://localhost:8080/eureka/,http://localhost:8081/eureka/,http://localhost:8082/eureka/

2.Eureka原理解析

注:以下贴出的源码部分可能会省略不相关内容,并不是完整的源代码。

2.1Eureka Server启动加载机制

从上面简单的Server开发的内容可以看到,主要就是引入eureka-server的依赖,在配置文件中对一些参数进行了相应的配置,然后在启动类上加入了@EnableEurekaServer的注解。整个内容很简单,我们没有做太多的工作,那么整个Eureka Server是如何启动起来的,启动过程中又做了哪些工作,我们就从@EnableEurekaServer这个注解开始看。

首先看看@EnableEurekaServer的源码,从注释可以看到这个注解引入了EurekaServerMarkerConfiguration。

/**
 * Annotation to activate Eureka Server related configuration {@link EurekaServerAutoConfiguration}
 */

@EnableDiscoveryClient
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}

EurekaServerMarkerConfiguration这个类是个配置类,生成了一个Marker Bean实例,供给给EurekaServerAutoConfiguration配置使用,这个Marker是个空类,具体作用现在不得而知。

/**
 * Responsible for adding in a marker bean to activate
 * {@link EurekaServerAutoConfiguration}
 */
@Configuration
public class EurekaServerMarkerConfiguration {

    @Bean
    public Marker eurekaServerMarkerBean() {
        return new Marker();
    }

    class Marker {
    }
}

再来看看EurekaServerAutoConfiguration,这个配置类生成了很多Bean,比如EurekaServerConfig、PeerAwareInstanceRegistry、EurekaServerContext、EurekaServerBootstrap等等,Server启动过程中肯定会依赖这些实例,但是没有看到相关的启动的代码。这个类也引入了EurekaServerInitializerConfiguration配置,我们接着分析EurekaServerInitializerConfiguration。

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    /**
     * List of packages containing Jersey resources required by the Eureka server
     */
    private static String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
            "com.netflix.eureka" };

    @Autowired
    private ApplicationInfoManager applicationInfoManager;

    @Autowired
    private EurekaServerConfig eurekaServerConfig;

    @Autowired
    private EurekaClientConfig eurekaClientConfig;

    @Autowired
    private EurekaClient eurekaClient;

    @Autowired
    private InstanceRegistryProperties instanceRegistryProperties;

    public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();

    @Bean
    public HasFeatures eurekaServerFeature() {
        return HasFeatures.namedFeature("Eureka Server",
                EurekaServerAutoConfiguration.class);
    }

    @Configuration
    protected static class EurekaServerConfigBeanConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
            EurekaServerConfigBean server = new EurekaServerConfigBean();
            if (clientConfig.shouldRegisterWithEureka()) {
                // Set a sensible default if we are supposed to replicate
                server.setRegistrySyncRetries(5);
            }
            return server;
        }
    }



    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
            ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications(); // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                serverCodecs, this.eurekaClient,
                this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
                this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
            ServerCodecs serverCodecs) {
        return new PeerEurekaNodes(registry, this.eurekaServerConfig,
                this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
            PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                registry, peerEurekaNodes, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
            EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager,
                this.eurekaClientConfig, this.eurekaServerConfig, registry,
                serverContext);
    }


    @Bean
    public FilterRegistrationBean traceFilterRegistration(
            @Qualifier("webRequestLoggingFilter") Filter filter) {
        FilterRegistrationBean bean = new FilterRegistrationBean();
        bean.setFilter(filter);
        bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
        return bean;
    }
}

EurekaServerInitializerConfiguration的源码如下,这里面有个start方法,方法里面启动了一个线程,线程里面出现了重要的一句话log.info("Started Eureka Server"),这句日志前面eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext)很有可能是启动Server用的。

@Configuration
@CommonsLog
public class EurekaServerInitializerConfiguration
        implements ServletContextAware, SmartLifecycle, Ordered {

    @Override
    public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //TODO: is this class even needed now?
                    eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                    log.info("Started Eureka Server");

                    publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                    EurekaServerInitializerConfiguration.this.running = true;
                    publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
                }
                catch (Exception ex) {
                    // Help!
                    log.error("Could not initialize Eureka servlet context", ex);
                }
            }
        }).start();
    }

}

接着看EurekaServerBootstrap,contextInitialized方法里面调用了initEurekaEnvironment和initEurekaServerContext,这应该就是初始化server环境和上下文。

@CommonsLog
public class EurekaServerBootstrap {

    protected EurekaServerConfig eurekaServerConfig;

    protected ApplicationInfoManager applicationInfoManager;

    protected EurekaClientConfig eurekaClientConfig;

    protected PeerAwareInstanceRegistry registry;

    protected volatile EurekaServerContext serverContext;
    protected volatile AwsBinder awsBinder;

    public EurekaServerBootstrap(ApplicationInfoManager applicationInfoManager,
            EurekaClientConfig eurekaClientConfig, EurekaServerConfig eurekaServerConfig,
            PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
        this.applicationInfoManager = applicationInfoManager;
        this.eurekaClientConfig = eurekaClientConfig;
        this.eurekaServerConfig = eurekaServerConfig;
        this.registry = registry;
        this.serverContext = serverContext;
    }

    public void contextInitialized(ServletContext context) {
        try {
            initEurekaEnvironment();
            initEurekaServerContext();

            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            log.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

    protected void initEurekaEnvironment() throws Exception {
        log.info("Setting the eureka configuration..");

        String dataCenter = ConfigurationManager.getConfigInstance()
                .getString(EUREKA_DATACENTER);
        if (dataCenter == null) {
            log.info(
                    "Eureka data center value eureka.datacenter is not set, defaulting to default");
            ConfigurationManager.getConfigInstance()
                    .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
        }
        else {
            ConfigurationManager.getConfigInstance()
                    .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
        }
        String environment = ConfigurationManager.getConfigInstance()
                .getString(EUREKA_ENVIRONMENT);
        if (environment == null) {
            ConfigurationManager.getConfigInstance()
                    .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
            log.info(
                    "Eureka environment value eureka.environment is not set, defaulting to test");
        }
        else {
            ConfigurationManager.getConfigInstance()
                    .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
        }
    }

    protected void initEurekaServerContext() throws Exception {
        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);

        if (isAws(this.applicationInfoManager.getInfo())) {
            this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                    this.eurekaClientConfig, this.registry, this.applicationInfoManager);
            this.awsBinder.start();
        }

        EurekaServerContextHolder.initialize(this.serverContext);

        log.info("Initialized server context");

        // Copy registry from neighboring eureka node
        int registryCount = this.registry.syncUp();
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);

        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }

}

最后就得看EurekaServerInitializerConfiguration#start是什么时候被调用的了,找了很久,都没有发现有调用链,那start究竟是什么时候被调用的呢?可以看到,start方法上有个@Override注解,同时该类实现了三个接口,说明这方法是来自其中一个。通过向上查询,发现秘密来自于SmartLifeCycle,当Spring容器加载所有bean并完成初始化之后,会接着回调实现该接口的类中对应的方法(start()方法)来异步启动我们自己的服务。

现在,我们可以来总结下Eureka Server是怎么启动起来的了,首先配置@EnableEurekaServer,这个注解加载EurekaServerMarkerConfiguration了,这个配置类生成了Marker Bean,使得EurekaServerAutoConfiguration的条件@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) 得以满足,然后EurekaServerAutoConfiguration被加载,生成了EurekaServerInitializerConfiguration所需要的Bean,由于EurekaServerInitializerConfiguration实现了SmartLifeCycle接口,在容器加载时,调用EurekaServerInitializerConfiguration的start方法,进而初始化Eureka Server环境和上下文对象。

2.2Eureka Client启动加载机制

根据Server启动加载的思路,我们来看看Client启动加载机制。首先看下@EnableEurekaClient,这个注解是开启Eureka客户端的注解,先来看下注解源码究竟包含了哪些东西。通过下面的源码内容发现里面竟然有@EnableDiscoveryClient注解。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableDiscoveryClient
public @interface EnableEurekaClient {

}

既然使用到了@EnableDiscoveryClient,我们接着看看。这里面重要的就是引入了EnableDiscoveryClientImportSelector这个类。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

    /**
     * If true, the ServiceRegistry will automatically register the local server.
     */
    boolean autoRegister() default true;
}

EnableDiscoveryClientImportSelector又做了哪些工作,里面没有发现一些start、init关键的词语,我们接着往上看,看看这个类的父类SpringFactoryImportSelector。

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
        extends SpringFactoryImportSelector<EnableDiscoveryClient> {

    @Override
    public String[] selectImports(AnnotationMetadata metadata) {
        String[] imports = super.selectImports(metadata);

        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));

        boolean autoRegister = attributes.getBoolean("autoRegister");

        if (autoRegister) {
            List<String> importsList = new ArrayList<>(Arrays.asList(imports));
            importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
            imports = importsList.toArray(new String[0]);
        }

        return imports;
    }


}

SpringFactoryImportSelector这个类看起来好像也没有启动初始化一类的方法语句,再往上追溯就是一些接口了,到这里好像突然线索就断了。但是这些接口好像都是都不是一些普通的接口,而是与spring相关的接口,那就查查这些接口都是干嘛用的。两个以Aware结尾的接口肯定都是获取spring容器内容的接口,就剩下一个DeferredImportSelector了,查阅相关资料可知这个接口的作用类似于@Import注解,主要是用来根据不同的情况选择加载不同的类,其核心方法就是selectImports。选择以后,由org.springframework.context.annotation.ConfigurationClassParser#processDeferredImportSelectors进行加载。那就说明在这个方法里面会选择一些类进行加载。List factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader)));这一个语句的注释是说找到所有自动配置的类并去重,这些自动配置类是不是就指的是加载Eureka客户端的配置类?查看loadFactoryNames这个方法的注释:Load the fully qualified class names of factory implementations of the given type from [META-INF/spring.factories],说明这就是从spring.factories中加载配置类。

@CommonsLog
public abstract class SpringFactoryImportSelector<T>
        implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {

    private ClassLoader beanClassLoader;

    private Class<T> annotationClass;

    private Environment environment;

    @SuppressWarnings("unchecked")
    protected SpringFactoryImportSelector() {
        this.annotationClass = (Class<T>) GenericTypeResolver
                .resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
    }

    @Override
    public String[] selectImports(AnnotationMetadata metadata) {
        if (!isEnabled()) {
            return new String[0];
        }
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                metadata.getAnnotationAttributes(this.annotationClass.getName(), true));

        Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
                + metadata.getClassName() + " annotated with @" + getSimpleName() + "?");

        // Find all possible auto configuration classes, filtering duplicates
        List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
                .loadFactoryNames(this.annotationClass, this.beanClassLoader)));

        if (factories.isEmpty() && !hasDefaultFactory()) {
            throw new IllegalStateException("Annotation @" + getSimpleName()
                    + " found, but there are no implementations. Did you forget to include a starter?");
        }

        if (factories.size() > 1) {
            // there should only ever be one DiscoveryClient, but there might be more than
            // one factory
            log.warn("More than one implementation " + "of @" + getSimpleName()
                    + " (now relying on @Conditionals to pick one): " + factories);
        }

        return factories.toArray(new String[factories.size()]);
    }


}

下面是spring.factories的内容

# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.CommonsClientAutoConfiguration,\
org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\
org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,\
org.springframework.cloud.client.serviceregistry.ServiceRegistryAutoConfiguration,\
org.springframework.cloud.commons.util.UtilAutoConfiguration,\
org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration


# Environment Post Processors
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.client.HostInfoEnvironmentPostProcessor

究竟加载的是哪个类,我们来加个断点调试下,最后结果如下所示:

接着去看org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,看到这里是不是感觉很熟悉,又出现了Marker这个,所有肯定有地方把这个Marker作为一个开关,没错,就是EurekaClientAutoConfiguration。

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@CommonsLog
public class EurekaDiscoveryClientConfiguration {

    class Marker {}

    @Bean
    public Marker eurekaDiscoverClientMarker() {
        return new Marker();
    }

}

EurekaClientAutoConfiguration会创建一个new CloudEurekaClient(manager, config, this.optionalArgs,this.context)对象,该对象会进行一系列初始化,创建电定时任务进行注册、续约、下线等。具体内容参见下面章节的内容。

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
        CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration")
public class EurekaClientAutoConfiguration {

        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        public ApplicationInfoManager eurekaApplicationInfoManager(
                EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }
    }

}

2.3服务注册

首先客户端需要确保eureka.client.register-with-eureka=true参数,默认为true。设置为false时将不会进行注册。

客户端注册基本元数据包括主机名, IP地址, 端口号, 状态页面地址(/info) 和健康度检查地址(/health)。其他元数据信息可以在eureka.instance.metadataMap中添加。

2.3.1客户端申请注册

com.netflix.discovery.DiscoveryClient初始化时,构造器会调用initScheduledTasks()方法,该方法会创建一个InstanceInfoReplicator对象实例,该对象会创建一个定时任务,然后启动该定时任务。

if (clientConfig.shouldRegisterWithEureka()) {


    // InstanceInfo replicator
    // 创建一个InstanceInfoReplicator对象实例
    instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize


    // 启动定时任务线程
  instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
    logger.info("Not registering with Eureka server per configuration");
}

创建InstanceInfoReplicator对象实例会新建一个执行定时任务的线程池。

InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
    this.discoveryClient = discoveryClient;
    this.instanceInfo = instanceInfo;
    // 新建一个执行定时任务的线程池
    this.scheduler = Executors.newScheduledThreadPool(1,
            new ThreadFactoryBuilder()
                    .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                    .setDaemon(true)
                    .build());
}

定时任务内容为

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            // 注册
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

然后以initialDelayMs为延迟调用

public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            // 利用定时任务线程池启动当前注册线程
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

最后通过discoveryClient.register()触发注册

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

服务提供者向注册中心注册时,需要同时配置多注册中心的地址。只要有一个注册成功,即不向其他注册中心进行注册,而是由注册中心之间进行同步。如果当前注册中心无法注册成功,即向其他注册中心进行注册。

2.3.2服务端接收注册

服务端com.netflix.eureka.resources.ApplicationResource暴露一个添加实例的服务

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,                 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {

    registry.register(info, "true".equals(isReplication));
    // 返回 204 成功
    return Response.status(204).build();  // 204 to be backwards compatible
}

然后调用register方法,isReplication标识为true,表示当前收到的注册信息是来自其他peer的复制事件,那么将不会将这个注册信息继续复制到其他peer

/**
 * Registers the information about the {@link InstanceInfo} and replicates
 * this information to all peer eureka nodes. If this is replication event
 * from other replica nodes then it is not replicated.
 *
 * @param info
 *            the {@link InstanceInfo} to be registered and replicated.
 * @param isReplication
 *            true if this is a replication event from other replica nodes,
 *            false otherwise.
 */
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    // 默认租约有效时长为90s
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    // 注册信息里包含的租约时长
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 注册
    super.register(info, leaseDuration, isReplication);
    // 同步到其他节点
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

服务端接收到注册元数据后,会将数据存储在一个双层结构Map中(ConcurrentHashMap\<String, Map\<String, Lease\>>),第一层key是服务名(spring.application.name),第二层key时服务的实例名(Instance ID)。

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
        read.lock();
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());

        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);

    } finally {
        read.unlock();
    }
}
2.4服务续租

2.4.1客户端申请续租

客户端注册成功之后,每隔固定时间向 Server 发起续租( renew ),避免过期被剔除。默认每隔30s向服务器续租一次,有效期为90s。

com.netflix.discovery.DiscoveryClient初始化过程中,会创建心跳线程,如果客户端需要注册到server,那么将会启动续租的心跳线程。

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    try {
        scheduler = Executors.newScheduledThreadPool(3,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

    }

    initScheduledTasks();

}


private void initScheduledTasks() {
    if (clientConfig.shouldRegisterWithEureka()) {
        // Heartbeat timer
        // 初始化调度
        scheduler.schedule(
                new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs,TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()),
                renewalIntervalInSecs, TimeUnit.SECONDS);
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

// 内部类
private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

// 续租
boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;

        // Initialize the counters and register.
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
}

    public void run() {
        Future future = null;
        try {
            // 执行续租任务
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
        } catch (TimeoutException e) {
            logger.error("task supervisor timed out", e);
            timeoutCounter.increment();

            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.error("task supervisor rejected the task", e);
            }

            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.error("task supervisor threw an exception", e);
            }

            throwableCounter.increment();
        } finally {
            // 任务超时
            if (future != null) {
                future.cancel(true);
            }
            // 重新调度任务
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

定时线程池Scheduler启动TimedSupervisorTask监控任务,由TimedSupervisorTask任务向Executor提交HeartbeatThread续约任务。续约完成或失败,由Scheduler重新启动TimedSupervisorTask监控任务。

renew

2.4.2服务端接收续租

服务端com.netflix.eureka.resources.InstanceResource暴露一个续租实例的服务

/**
     * A put request for renewing lease from a client instance.
     *
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     * @param overriddenStatus
     *            overridden status if any.
     * @param status
     *            the {@link InstanceStatus} of the instance.
     * @param lastDirtyTimestamp
     *            last timestamp when this instance information was updated.
     * @return response indicating whether the operation was a success or
     *         failure.
     */
    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }

然后调用com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl续租方法,该方法会调用父类com.netflix.eureka.registry.AbstractInstanceRegistry续租方法

public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
          // 同步到其他server  
          replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
}

com.netflix.eureka.registry.AbstractInstanceRegistry续租方法如下:

public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }
2.5服务下线

cancel

2.5.1客户端申请下线

com.netflix.discovery.DiscoveryClient中的shutdown方法是发起下线申请的入口,当服务器卸载DiscoveryClient的时候运行,此时会停止注册、续租的定时任务,然后向注册中心发起下线请求。

@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        // 关闭注册、续租定时任务
        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            //下线    
            unregister();
        }

        logger.info("Completed shut down of DiscoveryClient");
    }
}

// 关闭定时任务
private void cancelScheduledTasks() {
    // 停止注册任务
    if (instanceInfoReplicator != null) {
        instanceInfoReplicator.stop();
    }
    // 立即停止心跳续租任务
    if (heartbeatExecutor != null) {
        heartbeatExecutor.shutdownNow();
    }
    // 立即停止
    if (scheduler != null) {
        scheduler.shutdownNow();
    }
}

// 注销
void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if (eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient
                    .cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + appPathIdentifier + " - deregister  status: " + httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e);
        }
    }
}

2.5.2服务端接收下线

com.netflix.eureka.resources.InstanceResource暴露服务下线接口,当收到下线请求后,调用PeerAwareInstanceRegistryImpl的cancel方法,进而调用其父类AbstractInstanceRegistry的cancel方法,移除服务的注册信息并设置缓存过期。

/**
 * Handles cancellation of leases for this particular instance.
 *
 * @param isReplication
 *            a header parameter containing information whether this is
 *            replicated from other nodes.
 * @return response indicating whether the operation was a success or failure.
 */
@DELETE
public Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication));

    if (isSuccess) {
        logger.debug("Found (Cancel): " + app.getName() + " - " + id);
        return Response.ok().build();
    } else {
        logger.info("Not Found (Cancel): " + app.getName() + " - " + id);
        return Response.status(Status.NOT_FOUND).build();
    }
}

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl

/*
 * (non-Javadoc)
 *
 * @see com.netflix.eureka.registry.InstanceRegistry#cancel(java.lang.String,
 * java.lang.String, long, boolean)
 */
@Override
public boolean cancel(final String appName, final String id, final boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
        synchronized (lock) {
            if (this.expectedNumberOfRenewsPerMin > 0) {
                // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds,
                // 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
                    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin
                            * serverConfig.getRenewalPercentThreshold());
                }
            }
            return true;
        }
        return false;
}

com.netflix.eureka.registry.AbstractInstanceRegistry

/**
 * Cancels the registration of an instance.
 *
 * <p>
 * This is normally invoked by a client when it shuts down informing the server
 * to remove the instance from traffic.
 * </p>
 *
 * @param appName
 *            the application name of the application.
 * @param id
 *            the unique identifier of the instance.
 * @param isReplication
 *            true if this is a replication event from other nodes, false
 *            otherwise.
 * @return true if the instance was removed from the
 *         {@link AbstractInstanceRegistry} successfully, false otherwise.
 */
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
    return internalCancel(appName, id, isReplication);
}

/**
 * {@link #cancel(String, String, boolean)} method is overridden by
 * {@link PeerAwareInstanceRegistry}, so each cancel request is replicated to
 * the peers. This is however not desired for expires which would be counted in
 * the remote peers as valid cancellations, so self preservation mode would not
 * kick-in.
 */
protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        read.lock();
        CANCEL.increment(isReplication);
        // 移除注册信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        synchronized (recentCanceledQueue) {
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        }

        // 租约不存在
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            // 设置缓存过期
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            return true;
        }
    } finally {
        read.unlock();
    }
}

3.Eureka与ZooKeeper对比

首先说一下CAP理论,所谓CAP就是指Consistency(一致性)、Availability(可用性)和Partition tolerance(分区容忍性) 。

  • Consistency(一致性):数据一致更新,所有数据变动都是同步的
  • Availability(可用性):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求
  • Partition tolerance(分区容忍性):分布式系统在遇到任何网络分区故障的时候,仍然能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障

对于Eureka,其集群的特点是不区分主从,集群里面所有的机器都是平等的,任何一台服务器都能够对外提供服务。所以当其中某一台或者多台服务器发生故障时,剩下的服务器都能够继续对外提供服务。当宕机的机器恢复之后,Eureka会再次将其纳入到服务器集群管理之中,而所要做的内容也就仅仅是从其他节点同步一份注册信息过来而已。即使在最极端的情况下所有服务器全部宕机,客户端也会有一份注册信息的缓存来查询服务信息。

Eureka的注册信息是客户端通过心跳来续约的,当网络出现故障时,Eureka会启用自我保护机制,即即使没有收到客户端的心跳也会保存客户端的信息一段时间等待网络恢复,于此同时,也不影响新的服务注册。

对于Zookeeper,它是作为分布式协调服务出现的,它的职责是保证数据在所有服务之间保持同步、强一致性。怎么保证强一致性呢,它是采取的主从模式,所有Follower节点的数据都是Leader节点数据的副本。有数据更新时,都是转发给Leader由Leader完成更新。当某一台Follower节点宕机可能没有问题,但是当Leader宕机之后,Zookeeper需要进行选举新的Leader,而在选举这段时间对外是不能提供服务的。当宕机数量超过一半以后,选票无法超过二分之一,这时就会无法对外提供服务了。

发表评论

电子邮件地址不会被公开。 必填项已用*标注