1、服务端
org.springframework.cloud spring-cloud-starter-netflix-eureka-server
spring-cloud-netflix-eureka-server中spring.factories中EurekaServerAutoConfiguration
/** * Register the Jersey filter. * @param eurekaJerseyApp an {@link Application} for the filter to be registered * @return a jersey {@link FilterRegistrationBean} */ @Bean public FilterRegistrationBean> jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) {FilterRegistrationBeanbean = new FilterRegistrationBean (); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); return bean; }
1.1、服务注册
ApplicationResource类的addInstance方法接收请求,在对实例的信息进行验证后,向服务注册中心添加实例。
/注意是POST方法
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } //进入InstanceRegistry的register registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
进入InstanceRegistry的register在这里做了两个功能
@Override public void register(final InstanceInfo info, final boolean isReplication) {//在方法中使用publishEvent发布了监听事件 。Spring支持事件驱动,可以监听者模式进行事件的监听,这里广播给所有监听者,收到一个服务注册的请求。至于监听器,可以由我们自己手写实现,参数中的事件类型spring会帮我们直接注入 handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication); //调用父类PeerAwareInstanceRegistryImpl的register方法 super.register(info, isReplication); } @Component public class EurekaRegisterListener { @EventListener public void registe(EurekaInstanceRegisteredEvent event){ System.out.println(event.getInstanceInfo().getAppName()); } }
父类PeerAwareInstanceRegistryImpl的register方法
public void register(final InstanceInfo info, final boolean isReplication) { //拿到微服务的过期时间,并进行更新 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //将服务注册交给父类完成 父类AbstractInstanceRegistry的register方法,在这开始真正开始做服务注册 super.register(info, leaseDuration, isReplication); //完成集群信息同步 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
AbstractInstanceRegistry的register方法
AbstractInstanceRegistry这个类中定义的Eureka-server的服务注册列表的结构 ConcurrentHashMap中外层的String表示服务名称; Map中的String表示服务节点的id (也就是实例的instanceid); Lease是一个心跳续约的对象,InstanceInfo表示实例信息。 private final ConcurrentHashMap>> registry = new ConcurrentHashMap >>();
1、将新的实例写入注册表的数据map中
2、清理缓存
3、主动失效读写缓存里的数据
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); //首先,注册表根据微服务的名称或取Map,如果不存在就新建,使用putIfAbsent。 if (gMap == null) { //1、将新的实例写入注册表的数据map中 final ConcurrentHashMap > gNewMap = new ConcurrentHashMap >(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease //然后,从gMap(gMap就是该服务的实例列表)获取一次服务实例,判断这个微服务的节点是否存在,第一次注册的情况下一般是不存在的。当然,也有可能会发生注册信息冲突时,这时Eureka会根据最后活跃时间来判断到底覆盖哪一个: if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } //这段代码中,Eureka拿到存在节点的最后活跃时间,和当前注册节点的发起注册时间,进行对比。当存在的节点的最后活跃时间大于当前注册节点的时间,就说明之前存在的节点更活跃,就替换当前节点。 这里有一个思想,就是如果Eureka缓存的老节点更活跃,就说明它能够使用,而新来的服务我并不知道是否能用,那么Eureka就保守的使用了可用的老节点,从这一点也保证了可用性。 在拿到服务实例后对其进行封装: Lease lease = new Lease (registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //Lease是一个心跳续约的包装类,里面存放了注册信息,最后操作时间,注册时间,过期时间,剔除时间等信息。在这里把注册实例及过期时间放到这个心跳续约对象中,再把心跳续约对象放到gmap注册表中去。之后进行改变服务状态,系统数据统计,至此一个服务注册的流程就完成了。注册完成后,查看一下registry中的服务实例,发现我们启动的Eureka-client都已经放在里面了: gMap.put(registrant.getId(), lease); recentRegisteredQueue.add(new Pair ( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); //2、清理缓存 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
com.netflix.eureka.registry.AbstractInstanceRegistry#invalidateCache
private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { // invalidate cache responseCache.invalidate(appName, vipAddress, secureVipAddress); }
com.netflix.eureka.registry.ResponseCacheImpl#invalidate
@Override public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { for (Key.KeyType type : Key.KeyType.values()) { for (Version v : Version.values()) { invalidate( new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact) ); if (null != vipAddress) { invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full)); } if (null != secureVipAddress) { invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full)); } } } } //主动失效读写缓存里的数据 public void invalidate(Key... keys) { for (Key key : keys) { logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(key); CollectionkeysWithRegions = regionSpecificKeys.get(key); if (null != keysWithRegions && !keysWithRegions.isEmpty()) { for (Key keysWithRegion : keysWithRegions) { logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(keysWithRegion); } } } }
1.2、服务续约
InstanceRegistry # renew() ->
PeerAwareInstanceRegistry # renew()->
AbstractInstanceRegistry # renew()
先从注册表获取该服务的实例列表(gMap),再从gMap中通过实例的id 获取具体的 要续约的实例。之后根据服务实例的InstanceStatus判断是否处于宕机状态,以及是否和之前状态相同。如果一切状态正常,最终调用Lease中的renew方法:
可以看出,其实服务续约的操作非常简单,它的本质就是修改服务的最后的更新时间。将最后更新时间改为系统当前时间加上服务的过期时间。
值得提一下的是,lastUpdateTimestamp这个变量是被volatile关键字修饰的。
public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); Map> gMap = registry.get(appName); Lease leaseToRenew = null; if (gMap != null) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); leaseToRenew.renew(); return true; } }
1.3、服务剔除
当Eureka-server发现有的实例没有续约超过一定时间,则将该服务从注册列表剔除,该项工作由一个定时任务完成的。该任务的定义过程比较复杂,仅列出其调用过程:
EurekaServerInitializerConfiguration # start() ->
EurekaServerBootstrap # contextInitialized() ->
# initEurekaServerContext() ->
PeerAwareInstanceRegistryImpl # openForTraffic() ->
AbstractInstanceRegistry # postInit()
在AbstractInstanceRegistry的postInit方法中,定义EvictionTask定时任务,构建定时器启动该任务,执行任务中剔除方法 evict()。
private long evictionIntervalTimerInMs = 60 * 1000;
任务的时间被定义为60秒,即默认每分钟执行一次。
具体查看evit()剔除方法:
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. //新建实例列表expiredLeases,用来存放过期的实例。 List> expiredLeases = new ArrayList<>(); //遍历registry注册表,对实例进行检测工作,使用isExpired方法判断实例是否过期 //evictionTimestamp:剔除时间,当剔除节点的时候,将系统当前时间赋值给这个 evictionTimestamp //additionalLeaseMs:集群同步产生的预留时间,这个时间是程序中传过来的 for (Entry >> groupEntry : registry.entrySet()) { Map > leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry > leaseEntry : leaseMap.entrySet()) { Lease lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. int registrySize = (int) getLocalRegistrySize(); //根据阈值计算可以被剔除的服务数量最大值 int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); //剔除后剩余最小数量 int evictionLimit = registrySize - registrySizeThreshold; //expiredLeases.size() 剔除列表的数量 int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); //上面的代码中根据自我保护机制进行了判断,使用Min函数计算两者的最小值,剔除较小数量的服务实例。举个例子,假如当前共有100个服务,那么剔除阈值为85,如果list中有60个服务,那么就会剔除该60个服务。但是如果list中有95个服务,那么只会剔除其中的85个服务,在这种情况下,又会产生一个问题,eureka-server该如何判断去剔除哪些服务,保留哪些服务呢? Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); //这里使用了随机算法进行剔除,保证不会连续剔除某个微服务的全部实例。最终调用internalCancel方法,实际执行剔除。 internalCancel(appName, id, false); } } }
系统当前时间 >最后更新时间 + 过期时间 + 预留时间
这里进行判断:
系统当前时间 >最后更新时间 + 过期时间 + 预留时间
当该条件成立时,认为服务过期。在Eureka中过期时间默认定义为3个心跳的时间,一个心跳是30秒,因此过期时间是90秒。
当以上两个条件之一成立时,判断该实例过期,将该过期实例放入上面创建的列表中。注意这里仅仅是将实例放入List中,并没有实际剔除。
在实际剔除任务前,需要提一下eureka的自我保护机制:
当15分钟内,心跳失败的服务大于一定比例时,会触发自我保护机制。
这个值在Eureka中被定义为85%,一旦触发自我保护机制,Eureka会尝试保护其服务注册表中的信息,不再删除服务注册表中的数据。
1.4、服务下线
客户端发送http请求告诉eureka-server自己下线,调用 AbstractInstanceRegistry 中 cancel方法:
@Override public boolean cancel(String appName, String id, boolean isReplication) { return internalCancel(appName, id, isReplication); }
最终还是调用了和服务剔除中一样的方法,remove掉了gMap中的实例。
1.5、集群信息同步
集群信息同步发生在Eureka-server之间,之前提到在PeerAwareInstanceRegistryImpl类中,在执行register方法注册微服务实例完成后,执行了集群信息同步方法replicateToPeers,具体分析一下该方法:
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } //这 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: //这 node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } finally { CurrentRequestVersion.remove(); } }
首先,遍历集群节点,用以给各个集群信息节点进行信息同步。
然后,调用replicateInstanceActionsToPeers方法,在该方法中根据具体的操作类型Action,选择分支,最终调用PeerEurekaNode的register方法:
public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponseexecute() { //z这 return replicationClient.register(info); } }, expiryTime ); }
最终发送http请求,但是与普通注册操作不同的时,这时将集群同步的标识置为true,说明注册信息是来自集群同步。
在注册过程中运行到addInstance方法时,单独注册时isReplication的值为false,集群同步时为true。通过该值,能够避免集群间出现死循环,进行循环同步的问题。
1.6、获取注册信息接口
com.netflix.eureka.resources.ApplicationsResource#getContainers
1、从缓存中获取responseCache.get(cacheKey)
2、首先从只读缓存里获取
@GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { //1、从缓存中获取responseCache.get(cacheKey) response = Response.ok(responseCache.get(cacheKey)) .build(); } CurrentRequestVersion.remove(); return response; }
com.netflix.eureka.registry.ResponseCacheImpl#get(com.netflix.eureka.registry.Key)
public String get(final Key key) { //shouldUseReadOnlyResponseCache = true return get(key, shouldUseReadOnlyResponseCache); } @VisibleForTesting String get(final Key key, boolean useReadOnlyCache) { Value payload = getValue(key, useReadOnlyCache); if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { return null; } else { return payload.getPayload(); } } @VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { //2、首先从只读缓存里获取 final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { //3、只读缓存中没有就从读写缓存中获取 //读写缓存默认180秒会自动过期 payload = readWriteCacheMap.get(key); //回写只读缓存 //timeSchedule每隔30秒执行getCacheUpdate定时任务将读写缓存中的数据更新到只读缓存中 readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; }
ResponseCacheImpl
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); //读写缓存 this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) //180秒自动过期 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener() { @Override public void onRemoval(RemovalNotification notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader () { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } }); //每隔30秒执行getCacheUpdateTask定时任务将读写缓存中数据更新到只读缓存中 if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } }