了解Eureka Server的启动,服务注册,心跳维持和节点同步的工作机制。
Server 启动
首先从 Spring Boot 入口开始,@EnableEurekaServer 注解如下:
@EnableDiscoveryClient
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({EurekaServerMarkerConfiguration.class})
public @interface EnableEurekaServer {
}
默认开启了 EnableDiscoveryClient,并且 Import 了另一个要导入的自动配置,EurekaServerMarkerConfiguration。
@Configuration
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {
}
}
这个类从功能来看没有任何用处,从注释来看主要是添加一个 marker bean,用来激活 Eureka Server 的配置,至于怎么激活,还需要后面在看看。
Spring Boot 的自动配置我们已经知道了,它的入口就是添加进来的依赖包里面的 spring.factories 文件指定的 AutoConfiguration。在 spring-cloud-netflix-eureka-server 包里面,同样也有这个文件。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
在 EurekaServerAutoConfiguration 里面可以看到,上面的 Marker 存在的情况下,此配置才会被激活。这可能是为了避免某些情况下引用了这个包从而导致 EurekaServer 被隐性激活的情况吧。
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter
在这个类里面,会完成对大部分的功能 Bean 的组装。包括了:
- EurekaController
- ServerCodecs
- InstanceRegistry
- PeerEurekaNodes
- 一个 JAX-RS 应用的实例
- 一个 FilterRegistrationBean,会把上面的 Application 作为一个 Filter,设置最低优先级并过滤 Eureka Context 下面所有的请求。
Eureka Controller 是一个简单的 Controller,负责的是 eureka dashboard 的显示。包括 Node 自身的信息,以及在该节点上保存的注册信息。ServerCodecs 则是提供了对 json 和 xml 的解析器。
InstanceRegistry 是注册服务的关键。它继承了 PeerAwareInstanceRegistryImpl,看一下它的关键的 registry、cancel、renew
PeerEurekaNodes 维护当前 Eureka Server 要同步的 Peer Node,并通过一个定时任务维护 Peer Node 信息。
服务注册
#+NAME InstanceRegistry.java
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
implements ApplicationContextAware {
private ApplicationContext ctxt;
private int defaultOpenForTrafficCount;
/** 初始化父类,然后配置自身需要的额外参数,两个参数默认值为 1,尚不确定是否会根据开始 replica 模式自动变化。
**/
public InstanceRegistry(EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig, ServerCodecs serverCodecs,
EurekaClient eurekaClient, int expectedNumberOfRenewsPerMin,
int defaultOpenForTrafficCount) {
super(serverConfig, clientConfig, serverCodecs, eurekaClient);
this.expectedNumberOfRenewsPerMin = expectedNumberOfRenewsPerMin;
this.defaultOpenForTrafficCount = defaultOpenForTrafficCount;
}
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.ctxt = context;
}
//处理注册,leaseDuration 代表服务实例的租期,默认 90s,更新间隔 30s。这三个参数还不确定是否均有 EurekaClient 提供。
@Override
public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
handleRegistration(info, leaseDuration, isReplication);
super.register(info, leaseDuration, isReplication);
}
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
/**
将该注册封装成为一个事件发布到 Spring Context 里面,所以还需要去看看有哪些 listener 会监听这个事件。
**/
private void handleRegistration(InstanceInfo info, int leaseDuration,
boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
+ ", leaseDuration " + leaseDuration + ", isReplication "
+ isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
isReplication));
}
private void publishEvent(ApplicationEvent applicationEvent) {
this.ctxt.publishEvent(applicationEvent);
}
private int resolveInstanceLeaseDuration(final InstanceInfo info) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
return leaseDuration;
}
}
#+NAME PeerAwareInstanceRegistryImpl
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//调用 AbstractInstanceRegistry 进行自身注册
super.register(info, leaseDuration, isReplication);
//将注册信息同步到其他 Peer 节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
//如果是从其他节点发过来的复制信息,则不做任何事情
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 否则像当前已知的复制节点同步注册信息
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
1 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
2 try {
3 //当前线程申请读锁
4 read.lock();
5 //获取申请应用在注册表里面的已注册信息
6 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
7 //根据是从其他副本传过来的注册信息还是 client 直接请求到自己这里来的,进行计数。
8 REGISTER.increment(isReplication);
9 if (gMap == null) {
10 //是第一次遇到这个应用,初始化注册表项,一个 ConcurrentHashMap。
11 final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
12 gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
13 if (gMap == null) {
14 gMap = gNewMap;
15 }
16 }
17 //获取已注册的租期信息
18 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
19 // Retain the last dirty timestamp without overwriting it, if there is already a lease
20 if (existingLease != null && (existingLease.getHolder() != null)) {
21 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
22 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
23 logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
24 //如果当前已注册的租期比申请的对象要更晚(更新),那么就用已存在的注册项作为注册对象。
25 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
26 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
27 " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
28 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
29 registrant = existingLease.getHolder();
30 }
31 } else {
32 // The lease does not exist and hence it is a new registration
33 synchronized (lock) {
34 //同步方法,更新 expectedNumberOfRenewsPerMin 状态,启动时默认值是 1。因为 client 是每 30s 心跳一次,所以每分钟要+2,然后当前的 renewlPercentThreshold 代表了每分钟接收心跳数的下限,默认是*0.85
35 if (this.expectedNumberOfRenewsPerMin > 0) {
36 // Since the client wants to cancel it, reduce the threshold
37 // (1
38 // for 30 seconds, 2 for a minute)
39 this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
40 this.numberOfRenewsPerMinThreshold =
41 (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
42 }
43 }
44
45 }
46 //组装租期对象
47 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
48 if (existingLease != null) {
49 //更新服务已启动时间
50 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
51 }
52 //将此实例的租期对象放到服务的注册项里面
53 gMap.put(registrant.getId(), lease);
54 synchronized (recentRegisteredQueue) {
55 //把当前实例的注册事件放到 queue 里面去,方便查询最近的注册事件
56 recentRegisteredQueue.add(new Pair<Long, String>(
57 System.currentTimeMillis(),
58 registrant.getAppName() + "(" + registrant.getId() + ")"));
59 }
60 // This is where the initial state transfer of overridden status happens
61 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
62 //如果应用注册信息的 OverrideenStatus 不为 unknown
63 logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
64 + "overrides", registrant.getOverriddenStatus(), registrant.getId());
65 if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
66 //并且当前 Map 内不存在该实例的 OverriddenStatus,那么将注册信息里面的状态写入 map
67 logger.info("Not found overridden id {} and hence adding it", registrant.getId());
68 overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
69 }
70 }
71 InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
72 if (overriddenStatusFromMap != null) {
73 //如果 map 内存在该实例的 OverrideStatus,那么将该状态放入注册信息里面。
74 logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
75 registrant.setOverriddenStatus(overriddenStatusFromMap);
76 }
77
78 // Set the status based on the overridden status rules
79 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
80 registrant.setStatusWithoutDirty(overriddenInstanceStatus);
81
82 // If the lease is registered with UP status, set lease service up timestamp
83 if (InstanceStatus.UP.equals(registrant.getStatus())) {
84 lease.serviceUp();
85 }
86 registrant.setActionType(ActionType.ADDED);
87 recentlyChangedQueue.add(new RecentlyChangedItem(lease));
88 registrant.setLastUpdatedTimestamp();
89
90 //失效该实例所代表的 App 的 cache,下次请求返回新内容
91 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
92 logger.info("Registered instance {}/{} with status {} (replication={})",
93 registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
94 } finally {
95 read.unlock();
96 }
97 }
总结一下,服务注册的时候,Eureka Server 会向 Spring Context 发送一个注册事件,(如果有人感兴趣的话),然后将服务信息更新至自身维护的一个 ConcurrentHashMap,最后看这个注册动作是自己接收到的,还是别的接口同步过来的(isReplication),如果是自己接收到的,还会将这个注册信息同步到其他 node。其他 node 也是 registry 这个方法来处理,只不过对它们来说,这次就是复制注册,而非初始注册。
服务心跳维持
服务心跳维持走的是 InstanceRegistry 的 renew 方法。
#+NAME InstanceRegistry.java
public boolean renew(final String appName, final String serverId,
boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}"
+ isReplication);
List<Application> applications = getSortedApplications();
//从 registry 里面找到与当前对象 id 一直的实例注册信息,并发布一个更新事件。
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
}
}
//调用上层的更新
return super.renew(appName, serverId, isReplication);
}
#+NAME PeerAwareInstanceRegistry.java
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
//如果自身更新成功,再把这个心跳同步到其他 Peer 节点
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
#+NAME AbstractInstanceRegistry.java
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
//从 registry 里面取出对应服务的注册信息(包括所有实例)
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
//获取该实例的租期信息
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
//根据当前状态判断要覆盖的状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatus(overriddenInstanceStatus);
}
}
//更新计数器
renewsLastMin.increment();
//更新租期的时间戳
leaseToRenew.renew();
return true;
}
}
同步的逻辑基本与注册一致,根据心跳来源更新对应实例的租期,并且将心跳同步至其他 Peer 节点。
Peer Node 添加
在初始化的时候,Context 构建阶段会调用 PeerEurekaNodes 的 start 方法,在这里会初始化与其他 Peer 节点的连接。
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//创建一个单线程定时任务,定期更新 Peer 节点信息
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
updatePeerEurekaNodes 会根据 client 配置项里面的 URL 信息,对比当前维护的 PeerNodes,如果有新增或删除,则生成新的 PeerNodes,否则不变。