Eureka分为Client端和Server端,Client端向Server端注册自己的服务信息,并且拉取所有服务的注册信息,Server端作为注册中心,负责接收Client端的注册信息,维护所有服务的注册信息,Server端也可以开启集群模式,相互之间同步服务的注册信息。
与缓存相关的三个变量
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
AbstractInstanceRegistry的成员变量,相当于服务端的注册表,维护所有服务的注册信息,第一层Map的key是服务的应用名称,value也是一个map,其中key是实例id,value是实例的详细信息(Lease
private final LoadingCache<Key, ResponseCacheImpl.Value> readWriteCacheMap;
ResponseCacheImpl的成员变量,使用的是guava的LoadingCache构建的缓存,其中key为com.netflix.eureka.registry包下的Key对象,value是ResponseCacheImpl的内部类Value对象,从名称上可以看出它是一个读写缓存。
private final ConcurrentMap<Key, ResponseCacheImpl.Value> readOnlyCacheMap = new ConcurrentHashMap();
ResponseCacheImpl的成员变量,同样是一个ConcurrentMap,从名称上可以看出它是一个只读的缓存对象。
所以Eureka Server使用了一个读写分离的两级缓存机制,registry负责维护所有服务的注册信息,当register数据有变化,将会更新readWriteCacheMap的内容,后台开启定时任务,默认30s从readWriteCacheMap同步数据到readOnlyCacheMap,Eureka Client拉取服务的注册信息时,从readOnlyCacheMap读取数据,并没有直接从readWriteCacheMap中获取数据:
Client从Server端拉取数据流程:
(1)Client端DiscoveryClient的构造函数中,会初始化定时任务;
(2)缓存刷新任务,定时发送请求到Server端,拉取服务的注册数据,Server端收到请求,首先会根据请求信息构建key,根据key从缓存中获取数据;
(3)Server端根据key获取缓存数据时,会判断是否开启了使用readOnly缓存:
如果开启:先从readOnlyCacheMap获取,如果从readOnlyCacheMap未获取到数据,再从readWriteCacheMap中查找;
如果未开启:直接从readWriteCacheMap中获取;
(4)从readWriteCacheMap获取数据也有两种结果:
第一:获取到数据,如果开启了readOnly缓存,会将结果设置到readOnlyCacheMap中,否则直接返回结果即可;
第二:未获取到数据,由于readWriteCacheMap使用的是guava构建的缓存,从readWriteCacheMap根据key获取数据,假如key不存在的活,就会触发load方法,在这个方法中会调用generatePayload方法从registry中获取数据构并建缓存数据返回;
(5)Server端将结果设置到response返回到Client端;
AbstractInstanceRegistry
以register服务注册方法看一下缓存更新的流程,在register方法中,服务进行注册的时候,会根据服务的应用名称appName,调用invalidateCache方法清除之前存在的缓存。
public abstract class AbstractInstanceRegistry implements InstanceRegistry{
// server端的注册表,key是服务的应用名称,value又是一个map, key是实例id,value是实例的详细信息(Lease<InstanceInfo>)
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();
...
// 注册服务
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
this.read.lock();
// 根据应用名称从注册表中获取实例
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
EurekaMonitors.REGISTER.increment(isReplication);
// 如果获取的实例为空
if(gMap == null) {
// 创建一个hashmap
ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
// 加入注册表
gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
if(gMap == null) {
gMap = gNewMap;
}
}
// 根据实例ID获取,实例的详细信息
Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
...
registrant.setActionType(ActionType.ADDED);
this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 清除缓存
this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), Boolean.valueOf(isReplication)});
} finally {
this.read.unlock();
}
}
// 清除缓存
private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
this.responseCache.invalidate(appName, vipAddress, secureVipAddress);
}
}
ResponseCacheImpl
(1)ResponseCacheImpl中包含了readOnlyCacheMap只读缓存对象和readWriteCacheMap读写缓存对象,其中readWriteCacheMap使用的是guava的缓存机制,如果根据key从readWriteCacheMap获取数据时没有这个key,将会调用load方法,在load方法中是调用generatePayload构建Value对象的,在generatePayload是从registry获取数据的,也就是说当readWriteCacheMap不存在某个key是会从registry获取数据。
(2)在构造函数中,初始化了readWriteCacheMap对象,并判断是否使用readOnlyCacheMap缓存,如果开启,设置定时任务getCacheUpdateTask,定时从readWriteCacheMap同步数据到readOnlyCacheMap,同步方式是遍历readOnlyCacheMap,判断value是否与readWriteCacheMap的数据一致,如果不一致,以readWriteCacheMap的数据为准,更新readOnlyCacheMap的数据。
(3)invalidate方法用来清除readWriteCacheMap中的缓存.
public class ResponseCacheImpl implements ResponseCache {
...
// readOnlyCacheMap缓存
private final ConcurrentMap<Key, ResponseCacheImpl.Value> readOnlyCacheMap = new ConcurrentHashMap();
// guava构建的readWriteCacheMap缓存
private final LoadingCache<Key, ResponseCacheImpl.Value> readWriteCacheMap;
...
// 构造函数
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
// 是否使用只读缓存
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
// 缓存更新间隔
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
// 构建缓存对象,初始容量为1000
this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000).expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS).removalListener(new RemovalListener<Key, ResponseCacheImpl.Value>() {
public void onRemoval(RemovalNotification<Key, ResponseCacheImpl.Value> notification) {
Key removedKey = (Key)notification.getKey();
if(removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
ResponseCacheImpl.this.regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
}).build(new CacheLoader<Key, ResponseCacheImpl.Value>() {
// 如果缓存中key不存在,会调用load方法
public ResponseCacheImpl.Value load(Key key) throws Exception {
if(key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
ResponseCacheImpl.this.regionSpecificKeys.put(cloneWithNoRegions, key);
}
// 根据key构建缓存数据
ResponseCacheImpl.Value value = ResponseCacheImpl.this.generatePayload(key);
return value;
}
});
// 如果使用只读缓存
if(this.shouldUseReadOnlyResponseCache) {
// 注册定时任务,同步数据
this.timer.schedule(this.getCacheUpdateTask(), new Date(System.currentTimeMillis() / responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable var7) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", var7);
}
}
// 清除缓存
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
KeyType[] var4 = KeyType.values();
int var5 = var4.length;
for(int var6 = 0; var6 < var5; ++var6) {
KeyType type = var4[var6];
Version[] var8 = Version.values();
int var9 = var8.length;
for(int var10 = 0; var10 < var9; ++var10) {
Version v = var8[var10];
// 实际上调用的invalidate(Key... keys)方法
this.invalidate(new Key[]{new Key(EntityType.Application, appName, type, v, EurekaAccept.full), new Key(EntityType.Application, appName, type, v, EurekaAccept.compact), new Key(EntityType.Application, "ALL_APPS", type, v, EurekaAccept.full), new Key(EntityType.Application, "ALL_APPS", type, v, EurekaAccept.compact), new Key(EntityType.Application, "ALL_APPS_DELTA", type, v, EurekaAccept.full), new Key(EntityType.Application, "ALL_APPS_DELTA", type, v, EurekaAccept.compact)});
if(null != vipAddress) {
this.invalidate(new Key[]{new Key(EntityType.VIP, vipAddress, type, v, EurekaAccept.full)});
}
if(null != secureVipAddress) {
this.invalidate(new Key[]{new Key(EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full)});
}
}
}
}
public void invalidate(Key... keys) {
Key[] var2 = keys;
int var3 = keys.length;
for(int var4 = 0; var4 < var3; ++var4) {
Key key = var2[var4];
logger.debug("Invalidating the response cache key : {} {} {} {}, {}", new Object[]{key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()});
// 根据key清除缓存
this.readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = this.regionSpecificKeys.get(key);
if(null != keysWithRegions && !keysWithRegions.isEmpty()) {
Iterator var7 = keysWithRegions.iterator();
while(var7.hasNext()) {
Key keysWithRegion = (Key)var7.next();
logger.debug("Invalidating the response cache key : {} {} {} {} {}", new Object[]{key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()});
this.readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
// 定时任务,从readWriteCacheMap同步数据到readOnlyCacheMap
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
public void run() {
ResponseCacheImpl.logger.debug("Updating the client cache from response cache");
Iterator var1 = ResponseCacheImpl.this.readOnlyCacheMap.keySet().iterator();
// 遍历readOnlyCacheMap
while(var1.hasNext()) {
Key key = (Key)var1.next();
if(ResponseCacheImpl.logger.isDebugEnabled()) {
ResponseCacheImpl.logger.debug("Updating the client cache from response cache for key : {} {} {} {}", new Object[]{key.getEntityType(), key.getName(), key.getVersion(), key.getType()});
}
try {
CurrentRequestVersion.set(key.getVersion());
// 从readWriteCacheMap根据key获取数据
ResponseCacheImpl.Value cacheValue = (ResponseCacheImpl.Value)ResponseCacheImpl.this.readWriteCacheMap.get(key);
// 从readOnlyCacheMap获取数据
ResponseCacheImpl.Value currentCacheValue = (ResponseCacheImpl.Value)ResponseCacheImpl.this.readOnlyCacheMap.get(key);
// 对比数据是否一致
if(cacheValue != currentCacheValue) {
// 如果数据不一致,更新readOnlyCacheMap的数据 ResponseCacheImpl.this.readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable var5) {
ResponseCacheImpl.logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), var5);
}
}
}
};
}
// 根据key生成ResponseCacheImpl.Value对象
private ResponseCacheImpl.Value generatePayload(Key key) {
Stopwatch tracer = null;
ResponseCacheImpl.Value var8;
try {
String payload;
switch(null.$SwitchMap$com$netflix$eureka$registry$Key$EntityType[key.getEntityType().ordinal()]) {
case 1:
boolean isRemoteRegionRequested = key.hasRegions();
// 全量获取
if("ALL_APPS".equals(key.getName())) {
if(isRemoteRegionRequested) {
tracer = this.serializeAllAppsWithRemoteRegionTimer.start();
payload = this.getPayLoad(key, this.registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = this.serializeAllAppsTimer.start();
// 从registry获取数据
payload = this.getPayLoad(key, this.registry.getApplications());
}
} else if("ALL_APPS_DELTA".equals(key.getName())) {// 增量获取
if(isRemoteRegionRequested) {
tracer = this.serializeDeltaAppsWithRemoteRegionTimer.start();
this.versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = this.getPayLoad(key, this.registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = this.serializeDeltaAppsTimer.start();
this.versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = this.getPayLoad(key, this.registry.getApplicationDeltas());
}
} else {
tracer = this.serializeOneApptimer.start();
payload = this.getPayLoad(key, this.registry.getApplication(key.getName()));
}
break;
case 2:
case 3:
tracer = this.serializeViptimer.start();
payload = this.getPayLoad(key, getApplicationsForVip(key, this.registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
}
var8 = new ResponseCacheImpl.Value(payload);
} finally {
if(tracer != null) {
tracer.stop();
}
}
return var8;
}
}
ApplicationsResource
ApplicationsResource的getContainers接收Client端的拉取请求,在getContainers方法中,首先会根据请求信息构建缓存Key,根据key从缓存中获取数据,并设置到response中返回给Client端。
public class ApplicationsResource {
@GET
public Response getContainers(@PathParam("version") String version, @HeaderParam("Accept") String acceptHeader, @HeaderParam("Accept-Encoding") String acceptEncoding, @HeaderParam("X-Eureka-Accept") String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if(!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions);
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
if(!this.registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
} else {
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = KeyType.JSON;
String returnMediaType = "application/json";
if(acceptHeader == null || !acceptHeader.contains("json")) {
keyType = KeyType.XML;
returnMediaType = "application/xml";
}
// 构建缓存key
Key cacheKey = new Key(EntityType.Application, "ALL_APPS", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
Response response;
// 如果acceptEncoding是gizp
if(acceptEncoding != null && acceptEncoding.contains("gzip")) {
response = Response.ok(this.responseCache.getGZIP(cacheKey)).header("Content-Encoding", "gzip").header("Content-Type", returnMediaType).build();
} else {
// 从缓存中根据key获取数据设置到response中
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
return response;
}
}
}
回到ResponseCacheImpl,get方法用来根据key从缓存中获取数据,最终调用的是getValue方法,在getValue方法中可以看到,如果开启了只读缓存,先从readOnlyCacheMap中获取数据,如果未获取到再从readWriteCacheMap中获取,如果未开启只读缓存,直接从readWriteCacheMap中获取,如果readWriteCacheMap不存在某个key,可以回看ResponseCacheImpl构造函数初始化readWriteCacheMap时有一个load方法,不存在某个key时触发该方法,实际上是去registry中取数据并构建缓存数据:
public class ResponseCacheImpl implements ResponseCache {
...
// 根据keky获取缓存数据
public String get(Key key) {
return this.get(key, this.shouldUseReadOnlyResponseCache);
}
@VisibleForTesting
String get(Key key, boolean useReadOnlyCache) {
// 根据key从缓存map中获取value
ResponseCacheImpl.Value payload = this.getValue(key, useReadOnlyCache);
return payload != null && !payload.getPayload().equals("")?payload.getPayload():null;
}
@VisibleForTesting
ResponseCacheImpl.Value getValue(Key key, boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = null;
try {
// 如果使用ReadOnlyCache
if(useReadOnlyCache) {
// 从ReadOnlyCache获取缓存数据
ResponseCacheImpl.Value currentPayload = (ResponseCacheImpl.Value)this.readOnlyCacheMap.get(key);
// 如果获取不为空
if(currentPayload != null) {
payload = currentPayload;
} else {
// 如果获取为空,从readWriteCacheMap中获取数据
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
// 将获取的数据加入readOnlyCacheMap中
this.readOnlyCacheMap.put(key, payload);
}
} else {
// 如果不使用readOnlyCacheMap,直接从readWriteCacheMap获取数据
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
}
} catch (Throwable var5) {
logger.error("Cannot get value for key : {}", key, var5);
}
return payload;
}
}
DiscoveryClient
(1)在DiscoveryClient的构造方法中,会判断是否需要拉取数据,并且初始化定时任务;
(2)初始化定时任务方法中,会添加缓存更新的定时任务,是在CacheRefreshThread的run方法中实现的,在run方法中调用refreshRegistry刷新缓存,在refreshRegistry方法中又调用了fetchRegistry,所以最终使用的fetchRegistry方法从服务端拉取数据;
(3)fetchRegistry方法中会判断是全量还是增量从Server端拉取数据;
public class DiscoveryClient implements EurekaClient {
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
...
// 如果需要拉取数据
if(this.clientConfig.shouldFetchRegistry() && !this.fetchRegistry(false)) {
// 从备用服务拉取数据
this.fetchRegistryFromBackup();
}
...
// 初始化定时任务
this.initScheduledTasks();
...
}
// 初始化定时任务
private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
// 是否需要拉取数据
if(this.clientConfig.shouldFetchRegistry()) {
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 注册刷新缓存的定时任务
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
if(this.clientConfig.shouldRegisterWithEureka()) {
renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: renew interval is: {}", Integer.valueOf(renewalIntervalInSecs));
// 注册发送心跳的定时任务
this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread(null)), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
// 构建InstanceInfoReplicator,其run方法中会调用discoveryClient.register()发送HTTP请求进行服务注册
this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
this.statusChangeListener = new StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}
public void notify(StatusChangeEvent statusChangeEvent) {
if(InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
} else {
DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
}
DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
}
};
if(this.clientConfig.shouldOnDemandUpdateStatusChange()) {
this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
}
this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
// 缓存刷新任务
class CacheRefreshThread implements Runnable {
CacheRefreshThread() {
}
public void run() {
// 调用refreshRegistry刷新缓存
DiscoveryClient.this.refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
...
// 调用fetchRegistry从服务端拉取数据
boolean success = this.fetchRegistry(remoteRegionsModified);
if(success) {
this.registrySize = ((Applications)this.localRegionApps.get()).size();
this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
...
} catch (Throwable var9) {
logger.error("Cannot fetch registry from server", var9);
}
}
// 拉取数据
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = this.FETCH_REGISTRY_TIMER.start();
label122: {
boolean var4;
try {
Applications applications = this.getApplications();
if(!this.clientConfig.shouldDisableDelta() && Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress()) && !forceFullRegistryFetch && applications != null && applications.getRegisteredApplications().size() != 0 && applications.getVersion().longValue() != -1L) {
// 增量拉取数据
this.getAndUpdateDelta(applications);
} else {
logger.info("Disable delta property : {}", Boolean.valueOf(this.clientConfig.shouldDisableDelta()));
logger.info("Single vip registry refresh property : {}", this.clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", Boolean.valueOf(forceFullRegistryFetch));
logger.info("Application is null : {}", Boolean.valueOf(applications == null));
logger.info("Registered Applications size is zero : {}", Boolean.valueOf(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", Boolean.valueOf(applications.getVersion().longValue() == -1L));
// 全量拉取数据
this.getAndStoreFullRegistry();
}
applications.setAppsHashCode(applications.getReconcileHashCode());
this.logTotalInstances();
break label122;
} catch (Throwable var8) {
logger.error("DiscoveryClient_{} - was unable to refresh its cache! status = {}", new Object[]{this.appPathIdentifier, var8.getMessage(), var8});
var4 = false;
} finally {
if(tracer != null) {
tracer.stop();
}
}
return var4;
}
// 缓存刷新事件
this.onCacheRefreshed();
this.updateInstanceRemoteStatus();
return true;
}
}
参考:
[xmz_java:Eureka 缓存结构以及服务感知优化](https://www.cnblogs.com/xmzJava/p/11359636.html)
[宜信技术:详解Eureka 缓存机制](https://www.cnblogs.com/yixinjishu/p/10871243.html)
Spring Cloud版本:Finchley.RELEASE